dsdb: Quiet CID 1452117 1452119 1452114 (STRAY_SEMICOLON)
[Samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
blob6b0fbe728bce594ed4ce88c34045350ba5cd04a3
1 /*
2 Partitions ldb module
4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 * Name: ldb
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
34 struct part_request {
35 struct ldb_module *module;
36 struct ldb_request *req;
39 struct partition_context {
40 struct ldb_module *module;
41 struct ldb_request *req;
43 struct part_request *part_req;
44 unsigned int num_requests;
45 unsigned int finished_requests;
47 const char **referrals;
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
52 struct partition_context *ac;
54 ac = talloc_zero(req, struct partition_context);
55 if (ac == NULL) {
56 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57 return NULL;
60 ac->module = module;
61 ac->req = req;
63 return ac;
67 * helper functions to call the next module in chain
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
71 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72 const struct dsdb_control_current_partition *partition = NULL;
73 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74 if (partition_ctrl) {
75 partition = talloc_get_type(partition_ctrl->data,
76 struct dsdb_control_current_partition);
79 if (partition != NULL) {
80 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition->dn));
82 } else {
83 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module, request);
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91 struct ldb_dn *dn,
92 struct ldb_request *req)
94 unsigned int i;
95 struct ldb_control *partition_ctrl;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 if (partition_ctrl) {
103 const struct dsdb_control_current_partition *partition;
104 partition = talloc_get_type(partition_ctrl->data,
105 struct dsdb_control_current_partition);
106 if (partition != NULL) {
107 dn = partition->dn;
111 if (dn == NULL) {
112 return NULL;
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 return data->partitions[i];
124 return NULL;
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request *req,
131 struct ldb_reply *ares)
133 struct partition_context *ac;
134 struct ldb_module *module;
135 struct ldb_request *nreq;
136 int ret;
137 struct ldb_control *partition_ctrl;
139 ac = talloc_get_type(req->context, struct partition_context);
141 if (!ares) {
142 return ldb_module_done(ac->req, NULL, NULL,
143 LDB_ERR_OPERATIONS_ERROR);
146 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147 if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148 /* If we didn't fan this request out to mulitple partitions,
149 * or this is an individual search result, we can
150 * deterministically tell the caller what partition this was
151 * written to (repl_meta_data likes to know) */
152 ret = ldb_reply_add_control(ares,
153 DSDB_CONTROL_CURRENT_PARTITION_OID,
154 false, partition_ctrl->data);
155 if (ret != LDB_SUCCESS) {
156 return ldb_module_done(ac->req, NULL, NULL,
157 ret);
161 if (ares->error != LDB_SUCCESS) {
162 return ldb_module_done(ac->req, ares->controls,
163 ares->response, ares->error);
166 switch (ares->type) {
167 case LDB_REPLY_REFERRAL:
168 return ldb_module_send_referral(ac->req, ares->referral);
170 case LDB_REPLY_ENTRY:
171 if (ac->req->operation != LDB_SEARCH) {
172 ldb_set_errstring(ldb_module_get_ctx(ac->module),
173 "partition_req_callback:"
174 " Unsupported reply type for this request");
175 return ldb_module_done(ac->req, NULL, NULL,
176 LDB_ERR_OPERATIONS_ERROR);
179 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
181 case LDB_REPLY_DONE:
182 if (ac->req->operation == LDB_EXTENDED) {
183 /* FIXME: check for ares->response, replmd does not fill it ! */
184 if (ares->response) {
185 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186 ldb_set_errstring(ldb_module_get_ctx(ac->module),
187 "partition_req_callback:"
188 " Unknown extended reply, "
189 "only supports START_TLS");
190 talloc_free(ares);
191 return ldb_module_done(ac->req, NULL, NULL,
192 LDB_ERR_OPERATIONS_ERROR);
197 ac->finished_requests++;
198 if (ac->finished_requests == ac->num_requests) {
199 /* Send back referrals if they do exist (search ops) */
200 if (ac->referrals != NULL) {
201 const char **ref;
202 for (ref = ac->referrals; *ref != NULL; ++ref) {
203 ret = ldb_module_send_referral(ac->req,
204 talloc_strdup(ac->req, *ref));
205 if (ret != LDB_SUCCESS) {
206 return ldb_module_done(ac->req, NULL, NULL,
207 ret);
212 /* this was the last one, call callback */
213 return ldb_module_done(ac->req, ares->controls,
214 ares->response,
215 ares->error);
218 /* not the last, now call the next one */
219 module = ac->part_req[ac->finished_requests].module;
220 nreq = ac->part_req[ac->finished_requests].req;
222 ret = partition_request(module, nreq);
223 if (ret != LDB_SUCCESS) {
224 talloc_free(ares);
225 return ldb_module_done(ac->req, NULL, NULL, ret);
228 break;
231 talloc_free(ares);
232 return LDB_SUCCESS;
235 static int partition_prep_request(struct partition_context *ac,
236 struct dsdb_partition *partition)
238 int ret;
239 struct ldb_request *req;
240 struct ldb_control *partition_ctrl = NULL;
242 ac->part_req = talloc_realloc(ac, ac->part_req,
243 struct part_request,
244 ac->num_requests + 1);
245 if (ac->part_req == NULL) {
246 return ldb_oom(ldb_module_get_ctx(ac->module));
249 switch (ac->req->operation) {
250 case LDB_SEARCH:
251 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
252 ac->part_req,
253 ac->req->op.search.base,
254 ac->req->op.search.scope,
255 ac->req->op.search.tree,
256 ac->req->op.search.attrs,
257 ac->req->controls,
258 ac, partition_req_callback,
259 ac->req);
260 LDB_REQ_SET_LOCATION(req);
261 break;
262 case LDB_ADD:
263 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
264 ac->req->op.add.message,
265 ac->req->controls,
266 ac, partition_req_callback,
267 ac->req);
268 LDB_REQ_SET_LOCATION(req);
269 break;
270 case LDB_MODIFY:
271 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272 ac->req->op.mod.message,
273 ac->req->controls,
274 ac, partition_req_callback,
275 ac->req);
276 LDB_REQ_SET_LOCATION(req);
277 break;
278 case LDB_DELETE:
279 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280 ac->req->op.del.dn,
281 ac->req->controls,
282 ac, partition_req_callback,
283 ac->req);
284 LDB_REQ_SET_LOCATION(req);
285 break;
286 case LDB_RENAME:
287 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288 ac->req->op.rename.olddn,
289 ac->req->op.rename.newdn,
290 ac->req->controls,
291 ac, partition_req_callback,
292 ac->req);
293 LDB_REQ_SET_LOCATION(req);
294 break;
295 case LDB_EXTENDED:
296 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
297 ac->part_req,
298 ac->req->op.extended.oid,
299 ac->req->op.extended.data,
300 ac->req->controls,
301 ac, partition_req_callback,
302 ac->req);
303 LDB_REQ_SET_LOCATION(req);
304 break;
305 default:
306 ldb_set_errstring(ldb_module_get_ctx(ac->module),
307 "Unsupported request type!");
308 ret = LDB_ERR_UNWILLING_TO_PERFORM;
311 if (ret != LDB_SUCCESS) {
312 return ret;
315 ac->part_req[ac->num_requests].req = req;
317 if (ac->req->controls) {
318 /* Duplicate everything beside the current partition control */
319 partition_ctrl = ldb_request_get_control(ac->req,
320 DSDB_CONTROL_CURRENT_PARTITION_OID);
321 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
322 return ldb_module_oom(ac->module);
326 if (partition) {
327 void *part_data = partition->ctrl;
329 ac->part_req[ac->num_requests].module = partition->module;
331 if (partition_ctrl != NULL) {
332 if (partition_ctrl->data != NULL) {
333 part_data = partition_ctrl->data;
337 * If the provided current partition control is without
338 * data then use the calculated one.
340 ret = ldb_request_add_control(req,
341 DSDB_CONTROL_CURRENT_PARTITION_OID,
342 false, part_data);
343 if (ret != LDB_SUCCESS) {
344 return ret;
348 if (req->operation == LDB_SEARCH) {
349 /* If the search is for 'more' than this partition,
350 * then change the basedn, so a remote LDAP server
351 * doesn't object */
352 if (ldb_dn_compare_base(partition->ctrl->dn,
353 req->op.search.base) != 0) {
354 req->op.search.base = partition->ctrl->dn;
358 } else {
359 /* make sure you put the module here, or
360 * or ldb_next_request() will skip a module */
361 ac->part_req[ac->num_requests].module = ac->module;
364 ac->num_requests++;
366 return LDB_SUCCESS;
369 static int partition_call_first(struct partition_context *ac)
371 return partition_request(ac->part_req[0].module, ac->part_req[0].req);
375 * Send a request down to all the partitions (but not the sam.ldb file)
377 static int partition_send_all(struct ldb_module *module,
378 struct partition_context *ac,
379 struct ldb_request *req)
381 unsigned int i;
382 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
383 struct partition_private_data);
384 int ret;
386 for (i=0; data && data->partitions && data->partitions[i]; i++) {
387 ret = partition_prep_request(ac, data->partitions[i]);
388 if (ret != LDB_SUCCESS) {
389 return ret;
393 /* fire the first one */
394 return partition_call_first(ac);
397 struct partition_copy_context {
398 struct ldb_module *module;
399 struct partition_context *partition_context;
400 struct ldb_request *request;
401 struct ldb_dn *dn;
405 * A special DN has been updated in the primary partition. Now propagate those
406 * changes to the remaining partitions.
408 * Note: that the operations are asynchronous and this function is called
409 * from partition_copy_all_callback_handler in response to an async
410 * callback.
412 static int partition_copy_all_callback_action(
413 struct ldb_module *module,
414 struct partition_context *ac,
415 struct ldb_request *req,
416 struct ldb_dn *dn)
420 unsigned int i;
421 struct partition_private_data *data =
422 talloc_get_type(
423 ldb_module_get_private(module),
424 struct partition_private_data);
425 int search_ret;
426 struct ldb_result *res;
427 /* now fetch the resulting object, and then copy it to all the
428 * other partitions. We need this approach to cope with the
429 * partitions getting out of sync. If for example the
430 * @ATTRIBUTES object exists on one partition but not the
431 * others then just doing each of the partitions in turn will
432 * lead to an error
434 search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
435 if (search_ret != LDB_SUCCESS) {
436 return search_ret;
439 /* now delete the object in the other partitions, if requried
441 if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
442 for (i=0; data->partitions && data->partitions[i]; i++) {
443 int pret;
444 pret = dsdb_module_del(data->partitions[i]->module,
446 DSDB_FLAG_NEXT_MODULE,
447 req);
448 if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
449 /* we should only get success or no
450 such object from the other partitions */
451 return pret;
455 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
458 /* now add/modify in the other partitions */
459 for (i=0; data->partitions && data->partitions[i]; i++) {
460 struct ldb_message *modify_msg = NULL;
461 int pret;
462 unsigned int el_idx;
464 pret = dsdb_module_add(data->partitions[i]->module,
465 res->msgs[0],
466 DSDB_FLAG_NEXT_MODULE,
467 req);
468 if (pret == LDB_SUCCESS) {
469 continue;
472 if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
473 return pret;
476 modify_msg = ldb_msg_copy(req, res->msgs[0]);
477 if (modify_msg == NULL) {
478 return ldb_module_oom(module);
482 * mark all the message elements as
483 * LDB_FLAG_MOD_REPLACE
485 for (el_idx=0;
486 el_idx < modify_msg->num_elements;
487 el_idx++) {
488 modify_msg->elements[el_idx].flags
489 = LDB_FLAG_MOD_REPLACE;
492 if (req->operation == LDB_MODIFY) {
493 const struct ldb_message *req_msg = req->op.mod.message;
495 * mark elements to be removed, if there were
496 * deleted entirely above we need to delete
497 * them here too
499 for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
500 if (req_msg->elements[el_idx].flags & LDB_FLAG_MOD_DELETE
501 || ((req_msg->elements[el_idx].flags & LDB_FLAG_MOD_REPLACE) &&
502 req_msg->elements[el_idx].num_values == 0)) {
503 if (ldb_msg_find_element(modify_msg,
504 req_msg->elements[el_idx].name) != NULL) {
505 continue;
507 pret = ldb_msg_add_empty(
508 modify_msg,
509 req_msg->elements[el_idx].name,
510 LDB_FLAG_MOD_REPLACE,
511 NULL);
512 if (pret != LDB_SUCCESS) {
513 return pret;
519 pret = dsdb_module_modify(data->partitions[i]->module,
520 modify_msg,
521 DSDB_FLAG_NEXT_MODULE,
522 req);
524 if (pret != LDB_SUCCESS) {
525 return pret;
529 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
534 * @brief call back function for the ldb operations on special DN's.
536 * As the LDB operations are async, and we wish to use the result
537 * the operations, a callback needs to be registered to process the results
538 * of the LDB operations.
540 * @param req the ldb request
541 * @param res the result of the operation
543 * @return the LDB_STATUS
545 static int partition_copy_all_callback_handler(
546 struct ldb_request *req,
547 struct ldb_reply *ares)
549 struct partition_copy_context *ac = NULL;
551 ac = talloc_get_type(
552 req->context,
553 struct partition_copy_context);
555 if (!ares) {
556 return ldb_module_done(
557 ac->request,
558 NULL,
559 NULL,
560 LDB_ERR_OPERATIONS_ERROR);
563 /* pass on to the callback */
564 switch (ares->type) {
565 case LDB_REPLY_ENTRY:
566 return ldb_module_send_entry(
567 ac->request,
568 ares->message,
569 ares->controls);
571 case LDB_REPLY_REFERRAL:
572 return ldb_module_send_referral(
573 ac->request,
574 ares->referral);
576 case LDB_REPLY_DONE: {
577 int error = ares->error;
578 if (error == LDB_SUCCESS) {
579 error = partition_copy_all_callback_action(
580 ac->module,
581 ac->partition_context,
582 ac->request,
583 ac->dn);
585 return ldb_module_done(
586 ac->request,
587 ares->controls,
588 ares->response,
589 error);
592 default:
593 /* Can't happen */
594 return LDB_ERR_OPERATIONS_ERROR;
599 * send an operation to the top partition, then copy the resulting
600 * object to all other partitions.
602 static int partition_copy_all(
603 struct ldb_module *module,
604 struct partition_context *partition_context,
605 struct ldb_request *req,
606 struct ldb_dn *dn)
608 struct ldb_request *new_req = NULL;
609 struct ldb_context *ldb = NULL;
610 struct partition_copy_context *context = NULL;
612 int ret;
614 ldb = ldb_module_get_ctx(module);
616 context = talloc_zero(req, struct partition_copy_context);
617 if (context == NULL) {
618 return ldb_oom(ldb);
620 context->module = module;
621 context->request = req;
622 context->dn = dn;
623 context->partition_context = partition_context;
625 switch (req->operation) {
626 case LDB_ADD:
627 ret = ldb_build_add_req(
628 &new_req,
629 ldb,
630 req,
631 req->op.add.message,
632 req->controls,
633 context,
634 partition_copy_all_callback_handler,
635 req);
636 break;
637 case LDB_MODIFY:
638 ret = ldb_build_mod_req(
639 &new_req,
640 ldb,
641 req,
642 req->op.mod.message,
643 req->controls,
644 context,
645 partition_copy_all_callback_handler,
646 req);
647 break;
648 case LDB_DELETE:
649 ret = ldb_build_del_req(
650 &new_req,
651 ldb,
652 req,
653 req->op.del.dn,
654 req->controls,
655 context,
656 partition_copy_all_callback_handler,
657 req);
658 break;
659 case LDB_RENAME:
660 ret = ldb_build_rename_req(
661 &new_req,
662 ldb,
663 req,
664 req->op.rename.olddn,
665 req->op.rename.newdn,
666 req->controls,
667 context,
668 partition_copy_all_callback_handler,
669 req);
670 break;
671 default:
673 * Shouldn't happen.
675 ldb_debug(
676 ldb,
677 LDB_DEBUG_ERROR,
678 "Unexpected operation type (%d)\n", req->operation);
679 ret = LDB_ERR_OPERATIONS_ERROR;
680 break;
682 if (ret != LDB_SUCCESS) {
683 return ret;
685 return ldb_next_request(module, new_req);
688 * Figure out which backend a request needs to be aimed at. Some
689 * requests must be replicated to all backends
691 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
693 struct partition_context *ac;
694 unsigned int i;
695 int ret;
696 struct dsdb_partition *partition;
697 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
698 struct partition_private_data);
700 /* if we aren't initialised yet go further */
701 if (!data || !data->partitions) {
702 return ldb_next_request(module, req);
705 if (ldb_dn_is_special(dn)) {
706 /* Is this a special DN, we need to replicate to every backend? */
707 for (i=0; data->replicate && data->replicate[i]; i++) {
708 if (ldb_dn_compare(data->replicate[i],
709 dn) == 0) {
711 ac = partition_init_ctx(module, req);
712 if (!ac) {
713 return ldb_operr(ldb_module_get_ctx(module));
716 return partition_copy_all(module, ac, req, dn);
721 /* Otherwise, we need to find the partition to fire it to */
723 /* Find partition */
724 partition = find_partition(data, dn, req);
725 if (!partition) {
727 * if we haven't found a matching partition
728 * pass the request to the main ldb
730 * TODO: we should maybe return an error here
731 * if it's not a special dn
734 return ldb_next_request(module, req);
737 ac = partition_init_ctx(module, req);
738 if (!ac) {
739 return ldb_operr(ldb_module_get_ctx(module));
742 /* we need to add a control but we never touch the original request */
743 ret = partition_prep_request(ac, partition);
744 if (ret != LDB_SUCCESS) {
745 return ret;
748 /* fire the first one */
749 return partition_call_first(ac);
752 /* search */
753 static int partition_search(struct ldb_module *module, struct ldb_request *req)
755 struct ldb_control **saved_controls;
756 /* Find backend */
757 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
758 struct partition_private_data);
759 struct partition_context *ac;
760 struct ldb_context *ldb;
761 struct loadparm_context *lp_ctx;
763 struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
764 struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
765 struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
767 struct ldb_search_options_control *search_options = NULL;
768 struct dsdb_partition *p;
769 unsigned int i, j;
770 int ret;
771 bool domain_scope = false, phantom_root = false;
773 p = find_partition(data, NULL, req);
774 if (p != NULL) {
775 /* the caller specified what partition they want the
776 * search - just pass it on
778 return ldb_next_request(p->module, req);
781 /* Get back the search options from the search control, and mark it as
782 * non-critical (to make backends and also dcpromo happy).
784 if (search_control) {
785 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
786 search_control->critical = 0;
790 /* Remove the "domain_scope" control, so we don't confuse a backend
791 * server */
792 if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
793 return ldb_oom(ldb_module_get_ctx(module));
796 /* if we aren't initialised yet go further */
797 if (!data || !data->partitions) {
798 return ldb_next_request(module, req);
801 /* Special DNs without specified partition should go further */
802 if (ldb_dn_is_special(req->op.search.base)) {
803 return ldb_next_request(module, req);
806 /* Locate the options */
807 domain_scope = (search_options
808 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
809 || domain_scope_control;
810 phantom_root = search_options
811 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
813 /* Remove handled options from the search control flag */
814 if (search_options) {
815 search_options->search_options = search_options->search_options
816 & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
817 & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
820 ac = partition_init_ctx(module, req);
821 if (!ac) {
822 return ldb_operr(ldb_module_get_ctx(module));
825 ldb = ldb_module_get_ctx(ac->module);
826 lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
827 struct loadparm_context);
829 /* Search from the base DN */
830 if (ldb_dn_is_null(req->op.search.base)) {
831 if (!phantom_root) {
832 return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
834 return partition_send_all(module, ac, req);
837 for (i=0; data->partitions[i]; i++) {
838 bool match = false, stop = false;
840 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
841 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
842 req->op.search.base) == 0) {
843 /* base DN is in a partial replica
844 with the NO_GLOBAL_CATALOG
845 control. This partition is invisible */
846 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
847 continue;
851 if (phantom_root) {
852 /* Phantom root: Find all partitions under the
853 * search base. We match if:
855 * 1) the DN we are looking for exactly matches a
856 * certain partition and always stop
857 * 2) the DN we are looking for is a parent of certain
858 * partitions and it isn't a scope base search
859 * 3) the DN we are looking for is a child of a certain
860 * partition and always stop
861 * - we don't need to go any further up in the
862 * hierarchy!
864 if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
865 req->op.search.base) == 0) {
866 match = true;
867 stop = true;
869 if (!match &&
870 (ldb_dn_compare_base(req->op.search.base,
871 data->partitions[i]->ctrl->dn) == 0 &&
872 req->op.search.scope != LDB_SCOPE_BASE)) {
873 match = true;
875 if (!match &&
876 ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
877 req->op.search.base) == 0) {
878 match = true;
879 stop = true; /* note that this relies on partition ordering */
881 } else {
882 /* Domain scope: Find all partitions under the search
883 * base.
885 * We generate referral candidates if we haven't
886 * specified the domain scope control, haven't a base
887 * search* scope and the DN we are looking for is a real
888 * predecessor of certain partitions. When a new
889 * referral candidate is nearer to the DN than an
890 * existing one delete the latter (we want to have only
891 * the closest ones). When we checked this for all
892 * candidates we have the final referrals.
894 * We match if the DN we are looking for is a child of
895 * a certain partition or the partition
896 * DN itself - we don't need to go any further
897 * up in the hierarchy!
899 if ((!domain_scope) &&
900 (req->op.search.scope != LDB_SCOPE_BASE) &&
901 (ldb_dn_compare_base(req->op.search.base,
902 data->partitions[i]->ctrl->dn) == 0) &&
903 (ldb_dn_compare(req->op.search.base,
904 data->partitions[i]->ctrl->dn) != 0)) {
905 const char *scheme = ldb_get_opaque(
906 ldb, LDAP_REFERRAL_SCHEME_OPAQUE);
907 char *ref = talloc_asprintf(
909 "%s://%s/%s%s",
910 scheme == NULL ? "ldap" : scheme,
911 lpcfg_dnsdomain(lp_ctx),
912 ldb_dn_get_linearized(
913 data->partitions[i]->ctrl->dn),
914 req->op.search.scope ==
915 LDB_SCOPE_ONELEVEL ? "??base" : "");
917 if (ref == NULL) {
918 return ldb_oom(ldb);
921 /* Initialise the referrals list */
922 if (ac->referrals == NULL) {
923 char **l = str_list_make_empty(ac);
924 ac->referrals = discard_const_p(const char *, l);
925 if (ac->referrals == NULL) {
926 return ldb_oom(ldb);
930 /* Check if the new referral candidate is
931 * closer to the base DN than already
932 * saved ones and delete the latters */
933 j = 0;
934 while (ac->referrals[j] != NULL) {
935 if (strstr(ac->referrals[j],
936 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
937 str_list_remove(ac->referrals,
938 ac->referrals[j]);
939 } else {
940 ++j;
944 /* Add our new candidate */
945 ac->referrals = str_list_add(ac->referrals, ref);
947 talloc_free(ref);
949 if (ac->referrals == NULL) {
950 return ldb_oom(ldb);
953 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
954 match = true;
955 stop = true; /* note that this relies on partition ordering */
959 if (match) {
960 ret = partition_prep_request(ac, data->partitions[i]);
961 if (ret != LDB_SUCCESS) {
962 return ret;
966 if (stop) break;
969 /* Perhaps we didn't match any partitions. Try the main partition */
970 if (ac->num_requests == 0) {
971 talloc_free(ac);
972 return ldb_next_request(module, req);
975 /* fire the first one */
976 return partition_call_first(ac);
979 /* add */
980 static int partition_add(struct ldb_module *module, struct ldb_request *req)
982 return partition_replicate(module, req, req->op.add.message->dn);
985 /* modify */
986 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
988 return partition_replicate(module, req, req->op.mod.message->dn);
991 /* delete */
992 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
994 return partition_replicate(module, req, req->op.del.dn);
997 /* rename */
998 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
1000 /* Find backend */
1001 struct dsdb_partition *backend, *backend2;
1003 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1004 struct partition_private_data);
1006 /* Skip the lot if 'data' isn't here yet (initialisation) */
1007 if (!data) {
1008 return ldb_operr(ldb_module_get_ctx(module));
1011 backend = find_partition(data, req->op.rename.olddn, req);
1012 backend2 = find_partition(data, req->op.rename.newdn, req);
1014 if ((backend && !backend2) || (!backend && backend2)) {
1015 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1018 if (backend != backend2) {
1019 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1020 "Cannot rename from %s in %s to %s in %s: %s",
1021 ldb_dn_get_linearized(req->op.rename.olddn),
1022 ldb_dn_get_linearized(backend->ctrl->dn),
1023 ldb_dn_get_linearized(req->op.rename.newdn),
1024 ldb_dn_get_linearized(backend2->ctrl->dn),
1025 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
1026 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1029 return partition_replicate(module, req, req->op.rename.olddn);
1032 /* start a transaction */
1033 int partition_start_trans(struct ldb_module *module)
1035 int i = 0;
1036 int ret = 0;
1037 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1038 struct partition_private_data);
1039 /* Look at base DN */
1040 /* Figure out which partition it is under */
1041 /* Skip the lot if 'data' isn't here yet (initialization) */
1042 if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
1043 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
1047 * We start a transaction on metadata.tdb first and end it last in
1048 * end_trans. This makes locking semantics follow TDB rather than MDB,
1049 * and effectively locks all partitions at once.
1050 * Detail:
1051 * Samba AD is special in that the partitions module (this file)
1052 * combines multiple independently locked databases into one overall
1053 * transaction. Changes across multiple partition DBs in a single
1054 * transaction must ALL be either visible or invisible.
1055 * The way this is achieved is by taking out a write lock on
1056 * metadata.tdb at the start of prepare_commit, while unlocking it at
1057 * the end of end_trans. This is matched by read_lock, ensuring it
1058 * can't progress until that write lock is released.
1060 * metadata.tdb needs to be a TDB file because MDB uses independent
1061 * locks, which means a read lock and a write lock can be held at the
1062 * same time, whereas in TDB, the two locks block each other. The TDB
1063 * behaviour is required to implement the functionality described
1064 * above.
1066 * An important additional detail here is that if prepare_commit is
1067 * called on a TDB without any changes being made, no write lock is
1068 * taken. We address this by storing a sequence number in metadata.tdb
1069 * which is updated every time a replicated attribute is modified.
1070 * The possibility of a few unreplicated attributes being out of date
1071 * turns out not to be a problem.
1072 * For this reason, a lock on sam.ldb (which is a TDB) won't achieve
1073 * the same end as locking metadata.tdb, unless we made a modification
1074 * to the @ records found there before every prepare_commit.
1076 ret = partition_metadata_start_trans(module);
1077 if (ret != LDB_SUCCESS) {
1078 return ret;
1081 ret = ldb_next_start_trans(module);
1082 if (ret != LDB_SUCCESS) {
1083 partition_metadata_del_trans(module);
1084 return ret;
1087 ret = partition_reload_if_required(module, data, NULL);
1088 if (ret != LDB_SUCCESS) {
1089 ldb_next_del_trans(module);
1090 partition_metadata_del_trans(module);
1091 return ret;
1095 * The following per partition locks are required mostly because TDB
1096 * and MDB require locks before read and write ops are permitted.
1098 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1099 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1100 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
1101 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1103 ret = ldb_next_start_trans(data->partitions[i]->module);
1104 if (ret != LDB_SUCCESS) {
1105 /* Back it out, if it fails on one */
1106 for (i--; i >= 0; i--) {
1107 ldb_next_del_trans(data->partitions[i]->module);
1109 ldb_next_del_trans(module);
1110 partition_metadata_del_trans(module);
1111 return ret;
1115 data->in_transaction++;
1117 return LDB_SUCCESS;
1120 /* prepare for a commit */
1121 int partition_prepare_commit(struct ldb_module *module)
1123 unsigned int i;
1124 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1125 struct partition_private_data);
1126 int ret;
1129 * Order of prepare_commit calls must match that in
1130 * partition_start_trans. See comment in that function for detail.
1132 ret = partition_metadata_prepare_commit(module);
1133 if (ret != LDB_SUCCESS) {
1134 return ret;
1137 ret = ldb_next_prepare_commit(module);
1138 if (ret != LDB_SUCCESS) {
1139 return ret;
1142 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1143 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1144 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
1145 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1147 ret = ldb_next_prepare_commit(data->partitions[i]->module);
1148 if (ret != LDB_SUCCESS) {
1149 ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
1150 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1151 ldb_errstring(ldb_module_get_ctx(module)));
1152 return ret;
1156 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1157 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
1160 return LDB_SUCCESS;
1164 /* end a transaction */
1165 int partition_end_trans(struct ldb_module *module)
1167 int ret, ret2;
1168 int i;
1169 struct ldb_context *ldb = ldb_module_get_ctx(module);
1170 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1171 struct partition_private_data);
1172 bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1174 ret = LDB_SUCCESS;
1176 if (data->in_transaction == 0) {
1177 DEBUG(0,("partition end transaction mismatch\n"));
1178 ret = LDB_ERR_OPERATIONS_ERROR;
1179 } else {
1180 data->in_transaction--;
1184 * Order of end_trans calls must be the reverse of that in
1185 * partition_start_trans. See comment in that function for detail.
1187 if (data && data->partitions) {
1188 /* Just counting the partitions */
1189 for (i=0; data->partitions[i]; i++) {}
1191 /* now walk them backwards */
1192 for (i--; i>=0; i--) {
1193 struct dsdb_partition *p = data->partitions[i];
1194 if (trace) {
1195 ldb_debug(ldb,
1196 LDB_DEBUG_TRACE,
1197 "partition_end_trans() -> %s",
1198 ldb_dn_get_linearized(p->ctrl->dn));
1200 ret2 = ldb_next_end_trans(p->module);
1201 if (ret2 != LDB_SUCCESS) {
1202 ldb_asprintf_errstring(ldb,
1203 "end_trans error on %s: %s",
1204 ldb_dn_get_linearized(p->ctrl->dn),
1205 ldb_errstring(ldb));
1206 ret = ret2;
1211 if (trace) {
1212 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
1214 ret2 = ldb_next_end_trans(module);
1215 if (ret2 != LDB_SUCCESS) {
1216 ret = ret2;
1219 ret2 = partition_metadata_end_trans(module);
1220 if (ret2 != LDB_SUCCESS) {
1221 ret = ret2;
1224 return ret;
1227 /* delete a transaction */
1228 int partition_del_trans(struct ldb_module *module)
1230 int ret, final_ret = LDB_SUCCESS;
1231 int i;
1232 struct ldb_context *ldb = ldb_module_get_ctx(module);
1233 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1234 struct partition_private_data);
1235 bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1237 if (data == NULL) {
1238 DEBUG(0,("partion delete transaction with no private data\n"));
1239 return ldb_operr(ldb);
1243 * Order of del_trans calls must be the reverse of that in
1244 * partition_start_trans. See comment in that function for detail.
1246 if (data->partitions) {
1247 /* Just counting the partitions */
1248 for (i=0; data->partitions[i]; i++) {}
1250 /* now walk them backwards */
1251 for (i--; i>=0; i--) {
1252 struct dsdb_partition *p = data->partitions[i];
1253 if (trace) {
1254 ldb_debug(ldb,
1255 LDB_DEBUG_TRACE,
1256 "partition_del_trans() -> %s",
1257 ldb_dn_get_linearized(p->ctrl->dn));
1259 ret = ldb_next_del_trans(p->module);
1260 if (ret != LDB_SUCCESS) {
1261 ldb_asprintf_errstring(ldb,
1262 "del_trans error on %s: %s",
1263 ldb_dn_get_linearized(p->ctrl->dn),
1264 ldb_errstring(ldb));
1265 final_ret = ret;
1270 if (trace) {
1271 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1273 ret = ldb_next_del_trans(module);
1274 if (ret != LDB_SUCCESS) {
1275 final_ret = ret;
1278 ret = partition_metadata_del_trans(module);
1279 if (ret != LDB_SUCCESS) {
1280 final_ret = ret;
1283 if (data->in_transaction == 0) {
1284 DEBUG(0,("partition del transaction mismatch\n"));
1285 return ldb_operr(ldb_module_get_ctx(module));
1287 data->in_transaction--;
1289 return final_ret;
1292 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1293 uint64_t *seq_number,
1294 struct ldb_request *parent)
1296 int ret;
1297 struct ldb_result *res;
1298 struct ldb_seqnum_request *tseq;
1299 struct ldb_seqnum_result *seqr;
1301 tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1302 if (tseq == NULL) {
1303 return ldb_oom(ldb_module_get_ctx(module));
1305 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1307 ret = dsdb_module_extended(module, tseq, &res,
1308 LDB_EXTENDED_SEQUENCE_NUMBER,
1309 tseq,
1310 DSDB_FLAG_NEXT_MODULE,
1311 parent);
1312 if (ret != LDB_SUCCESS) {
1313 talloc_free(tseq);
1314 return ret;
1317 seqr = talloc_get_type_abort(res->extended->data,
1318 struct ldb_seqnum_result);
1319 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1320 talloc_free(res);
1321 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1322 "Primary backend in partition module returned a timestamp based seq");
1325 *seq_number = seqr->seq_num;
1326 talloc_free(tseq);
1327 return LDB_SUCCESS;
1332 * Older version of sequence number as sum of sequence numbers for each partition
1334 int partition_sequence_number_from_partitions(struct ldb_module *module,
1335 uint64_t *seqr)
1337 int ret;
1338 unsigned int i;
1339 uint64_t seq_number = 0;
1340 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1341 struct partition_private_data);
1343 ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1344 if (ret != LDB_SUCCESS) {
1345 return ret;
1348 /* Skip the lot if 'data' isn't here yet (initialisation) */
1349 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1350 struct ldb_seqnum_request *tseq;
1351 struct ldb_seqnum_result *tseqr;
1352 struct ldb_request *treq;
1353 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1354 if (res == NULL) {
1355 return ldb_oom(ldb_module_get_ctx(module));
1357 tseq = talloc_zero(res, struct ldb_seqnum_request);
1358 if (tseq == NULL) {
1359 talloc_free(res);
1360 return ldb_oom(ldb_module_get_ctx(module));
1362 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1364 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1365 LDB_EXTENDED_SEQUENCE_NUMBER,
1366 tseq,
1367 NULL,
1368 res,
1369 ldb_extended_default_callback,
1370 NULL);
1371 LDB_REQ_SET_LOCATION(treq);
1372 if (ret != LDB_SUCCESS) {
1373 talloc_free(res);
1374 return ret;
1377 ret = partition_request(data->partitions[i]->module, treq);
1378 if (ret != LDB_SUCCESS) {
1379 talloc_free(res);
1380 return ret;
1382 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1383 if (ret != LDB_SUCCESS) {
1384 talloc_free(res);
1385 return ret;
1387 tseqr = talloc_get_type(res->extended->data,
1388 struct ldb_seqnum_result);
1389 seq_number += tseqr->seq_num;
1390 talloc_free(res);
1393 *seqr = seq_number;
1394 return LDB_SUCCESS;
1399 * Newer version of sequence number using metadata tdb
1401 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1403 struct ldb_extended *ext;
1404 struct ldb_seqnum_request *seq;
1405 struct ldb_seqnum_result *seqr;
1406 uint64_t seq_number;
1407 int ret;
1409 seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1410 switch (seq->type) {
1411 case LDB_SEQ_NEXT:
1412 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1413 if (ret != LDB_SUCCESS) {
1414 return ret;
1416 break;
1418 case LDB_SEQ_HIGHEST_SEQ:
1419 ret = partition_metadata_sequence_number(module, &seq_number);
1420 if (ret != LDB_SUCCESS) {
1421 return ret;
1423 break;
1425 case LDB_SEQ_HIGHEST_TIMESTAMP:
1426 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1427 "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1430 ext = talloc_zero(req, struct ldb_extended);
1431 if (!ext) {
1432 return ldb_module_oom(module);
1434 seqr = talloc_zero(ext, struct ldb_seqnum_result);
1435 if (seqr == NULL) {
1436 talloc_free(ext);
1437 return ldb_module_oom(module);
1439 ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1440 ext->data = seqr;
1442 seqr->seq_num = seq_number;
1443 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1445 /* send request done */
1446 return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1449 /* lock all the backends */
1450 int partition_read_lock(struct ldb_module *module)
1452 int i = 0;
1453 int ret = 0;
1454 int ret2 = 0;
1455 struct ldb_context *ldb = ldb_module_get_ctx(module);
1456 struct partition_private_data *data = \
1457 talloc_get_type(ldb_module_get_private(module),
1458 struct partition_private_data);
1460 if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1461 ldb_debug(ldb, LDB_DEBUG_TRACE,
1462 "partition_read_lock() -> (metadata partition)");
1466 * It is important to only do this for LOCK because:
1467 * - we don't want to unlock what we did not lock
1469 * - we don't want to make a new lock on the sam.ldb
1470 * (triggered inside this routine due to the seq num check)
1471 * during an unlock phase as that will violate the lock
1472 * ordering
1475 if (data == NULL) {
1476 TALLOC_CTX *mem_ctx = talloc_new(module);
1478 data = talloc_zero(mem_ctx, struct partition_private_data);
1479 if (data == NULL) {
1480 talloc_free(mem_ctx);
1481 return ldb_operr(ldb);
1485 * When used from Samba4, this message is set by the
1486 * samba4 module, as a fixed value not read from the
1487 * DB. This avoids listing modules in the DB
1489 data->forced_module_msg = talloc_get_type(
1490 ldb_get_opaque(ldb,
1491 DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1492 struct ldb_message);
1494 ldb_module_set_private(module, talloc_steal(module,
1495 data));
1496 talloc_free(mem_ctx);
1500 * This will lock sam.ldb and will also call event loops,
1501 * so we do it before we get the whole db lock.
1503 ret = partition_reload_if_required(module, data, NULL);
1504 if (ret != LDB_SUCCESS) {
1505 return ret;
1509 * Order of read_lock calls must match that in partition_start_trans.
1510 * See comment in that function for detail.
1512 ret = partition_metadata_read_lock(module);
1513 if (ret != LDB_SUCCESS) {
1514 goto failed;
1518 * The top level DB (sam.ldb) lock is not enough to block another
1519 * process in prepare_commit(), because if nothing was changed in the
1520 * specific backend, then prepare_commit() is a no-op. Therefore the
1521 * metadata.tdb lock is taken out above, as it is the best we can do
1522 * right now.
1524 ret = ldb_next_read_lock(module);
1525 if (ret != LDB_SUCCESS) {
1526 ldb_debug_set(ldb,
1527 LDB_DEBUG_FATAL,
1528 "Failed to lock db: %s / %s for metadata partition",
1529 ldb_errstring(ldb),
1530 ldb_strerror(ret));
1532 return ret;
1536 * The following per partition locks are required mostly because TDB
1537 * and MDB require locks before reads are permitted.
1539 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1540 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1541 ldb_debug(ldb, LDB_DEBUG_TRACE,
1542 "partition_read_lock() -> %s",
1543 ldb_dn_get_linearized(
1544 data->partitions[i]->ctrl->dn));
1546 ret = ldb_next_read_lock(data->partitions[i]->module);
1547 if (ret == LDB_SUCCESS) {
1548 continue;
1551 ldb_debug_set(ldb,
1552 LDB_DEBUG_FATAL,
1553 "Failed to lock db: %s / %s for %s",
1554 ldb_errstring(ldb),
1555 ldb_strerror(ret),
1556 ldb_dn_get_linearized(
1557 data->partitions[i]->ctrl->dn));
1559 goto failed;
1562 return LDB_SUCCESS;
1564 failed:
1565 /* Back it out, if it fails on one */
1566 for (i--; i >= 0; i--) {
1567 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1568 if (ret2 != LDB_SUCCESS) {
1569 ldb_debug(ldb,
1570 LDB_DEBUG_FATAL,
1571 "Failed to unlock db: %s / %s",
1572 ldb_errstring(ldb),
1573 ldb_strerror(ret2));
1576 ret2 = ldb_next_read_unlock(module);
1577 if (ret2 != LDB_SUCCESS) {
1578 ldb_debug(ldb,
1579 LDB_DEBUG_FATAL,
1580 "Failed to unlock db: %s / %s",
1581 ldb_errstring(ldb),
1582 ldb_strerror(ret2));
1584 return ret;
1587 /* unlock all the backends */
1588 int partition_read_unlock(struct ldb_module *module)
1590 int i;
1591 int ret = LDB_SUCCESS;
1592 int ret2;
1593 struct ldb_context *ldb = ldb_module_get_ctx(module);
1594 struct partition_private_data *data = \
1595 talloc_get_type(ldb_module_get_private(module),
1596 struct partition_private_data);
1597 bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1600 * Order of read_unlock calls must be the reverse of that in
1601 * partition_start_trans. See comment in that function for detail.
1603 if (data && data->partitions) {
1604 /* Just counting the partitions */
1605 for (i=0; data->partitions[i]; i++) {}
1607 /* now walk them backwards */
1608 for (i--; i>=0; i--) {
1609 struct dsdb_partition *p = data->partitions[i];
1610 if (trace) {
1611 ldb_debug(ldb, LDB_DEBUG_TRACE,
1612 "partition_read_unlock() -> %s",
1613 ldb_dn_get_linearized(p->ctrl->dn));
1615 ret2 = ldb_next_read_unlock(p->module);
1616 if (ret2 != LDB_SUCCESS) {
1617 ldb_debug_set(ldb,
1618 LDB_DEBUG_FATAL,
1619 "Failed to lock db: %s / %s for %s",
1620 ldb_errstring(ldb),
1621 ldb_strerror(ret),
1622 ldb_dn_get_linearized(p->ctrl->dn));
1625 * Don't overwrite the original failure code
1626 * if there was one
1628 if (ret == LDB_SUCCESS) {
1629 ret = ret2;
1635 if (trace) {
1636 ldb_debug(ldb, LDB_DEBUG_TRACE,
1637 "partition_read_unlock() -> (metadata partition)");
1640 ret2 = ldb_next_read_unlock(module);
1641 if (ret2 != LDB_SUCCESS) {
1642 ldb_debug_set(ldb,
1643 LDB_DEBUG_FATAL,
1644 "Failed to unlock db: %s / %s for metadata partition",
1645 ldb_errstring(ldb),
1646 ldb_strerror(ret2));
1649 * Don't overwrite the original failure code
1650 * if there was one
1652 if (ret == LDB_SUCCESS) {
1653 ret = ret2;
1657 ret = partition_metadata_read_unlock(module);
1660 * Don't overwrite the original failure code
1661 * if there was one
1663 if (ret == LDB_SUCCESS) {
1664 ret = ret2;
1667 return ret;
1670 /* extended */
1671 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1673 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1674 struct partition_private_data);
1675 struct partition_context *ac;
1676 int ret;
1678 /* if we aren't initialised yet go further */
1679 if (!data) {
1680 return ldb_next_request(module, req);
1683 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1684 /* Update the metadata.tdb to increment the schema version if needed*/
1685 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1686 ret = partition_metadata_inc_schema_sequence(module);
1687 return ldb_module_done(req, NULL, NULL, ret);
1690 if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1691 return partition_sequence_number(module, req);
1694 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1695 return partition_create(module, req);
1699 * as the extended operation has no dn
1700 * we need to send it to all partitions
1703 ac = partition_init_ctx(module, req);
1704 if (!ac) {
1705 return ldb_operr(ldb_module_get_ctx(module));
1708 return partition_send_all(module, ac, req);
1711 static const struct ldb_module_ops ldb_partition_module_ops = {
1712 .name = "partition",
1713 .init_context = partition_init,
1714 .search = partition_search,
1715 .add = partition_add,
1716 .modify = partition_modify,
1717 .del = partition_delete,
1718 .rename = partition_rename,
1719 .extended = partition_extended,
1720 .start_transaction = partition_start_trans,
1721 .prepare_commit = partition_prepare_commit,
1722 .end_transaction = partition_end_trans,
1723 .del_transaction = partition_del_trans,
1724 .read_lock = partition_read_lock,
1725 .read_unlock = partition_read_unlock
1728 int ldb_partition_module_init(const char *version)
1730 LDB_MODULE_CHECK_VERSION(version);
1731 return ldb_register_module(&ldb_partition_module_ops);