ctdb-failover: Split statd_callout add-client/del-client
[Samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
blobbd636c7a0b55844c529345e682f360f0a0107918
1 /*
2 Partitions ldb module
4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 * Name: ldb
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
34 struct part_request {
35 struct ldb_module *module;
36 struct ldb_request *req;
39 struct partition_context {
40 struct ldb_module *module;
41 struct ldb_request *req;
43 struct part_request *part_req;
44 unsigned int num_requests;
45 unsigned int finished_requests;
47 const char **referrals;
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
52 struct partition_context *ac;
54 ac = talloc_zero(req, struct partition_context);
55 if (ac == NULL) {
56 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57 return NULL;
60 ac->module = module;
61 ac->req = req;
63 return ac;
67 * helper functions to call the next module in chain
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
71 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72 const struct dsdb_control_current_partition *partition = NULL;
73 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74 if (partition_ctrl) {
75 partition = talloc_get_type(partition_ctrl->data,
76 struct dsdb_control_current_partition);
79 if (partition != NULL) {
80 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition->dn));
82 } else {
83 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module, request);
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91 struct ldb_dn *dn,
92 struct ldb_request *req)
94 unsigned int i;
95 struct ldb_control *partition_ctrl;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 if (partition_ctrl) {
103 const struct dsdb_control_current_partition *partition;
104 partition = talloc_get_type(partition_ctrl->data,
105 struct dsdb_control_current_partition);
106 if (partition != NULL) {
107 dn = partition->dn;
111 if (dn == NULL) {
112 return NULL;
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 return data->partitions[i];
124 return NULL;
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request *req,
131 struct ldb_reply *ares)
133 struct partition_context *ac;
134 struct ldb_module *module;
135 struct ldb_request *nreq;
136 int ret;
137 struct ldb_control *partition_ctrl;
139 ac = talloc_get_type(req->context, struct partition_context);
141 if (!ares) {
142 return ldb_module_done(ac->req, NULL, NULL,
143 LDB_ERR_OPERATIONS_ERROR);
146 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147 if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148 /* If we didn't fan this request out to multiple partitions,
149 * or this is an individual search result, we can
150 * deterministically tell the caller what partition this was
151 * written to (repl_meta_data likes to know) */
152 ret = ldb_reply_add_control(ares,
153 DSDB_CONTROL_CURRENT_PARTITION_OID,
154 false, partition_ctrl->data);
155 if (ret != LDB_SUCCESS) {
156 return ldb_module_done(ac->req, NULL, NULL,
157 ret);
161 if (ares->error != LDB_SUCCESS) {
162 return ldb_module_done(ac->req, ares->controls,
163 ares->response, ares->error);
166 switch (ares->type) {
167 case LDB_REPLY_REFERRAL:
168 return ldb_module_send_referral(ac->req, ares->referral);
170 case LDB_REPLY_ENTRY:
171 if (ac->req->operation != LDB_SEARCH) {
172 ldb_set_errstring(ldb_module_get_ctx(ac->module),
173 "partition_req_callback:"
174 " Unsupported reply type for this request");
175 return ldb_module_done(ac->req, NULL, NULL,
176 LDB_ERR_OPERATIONS_ERROR);
179 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
181 case LDB_REPLY_DONE:
182 if (ac->req->operation == LDB_EXTENDED) {
183 /* FIXME: check for ares->response, replmd does not fill it ! */
184 if (ares->response) {
185 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186 ldb_set_errstring(ldb_module_get_ctx(ac->module),
187 "partition_req_callback:"
188 " Unknown extended reply, "
189 "only supports START_TLS");
190 talloc_free(ares);
191 return ldb_module_done(ac->req, NULL, NULL,
192 LDB_ERR_OPERATIONS_ERROR);
197 ac->finished_requests++;
198 if (ac->finished_requests == ac->num_requests) {
199 /* Send back referrals if they do exist (search ops) */
200 if (ac->referrals != NULL) {
201 const char **ref;
202 for (ref = ac->referrals; *ref != NULL; ++ref) {
203 ret = ldb_module_send_referral(ac->req,
204 talloc_strdup(ac->req, *ref));
205 if (ret != LDB_SUCCESS) {
206 return ldb_module_done(ac->req, NULL, NULL,
207 ret);
212 /* this was the last one, call callback */
213 return ldb_module_done(ac->req, ares->controls,
214 ares->response,
215 ares->error);
218 /* not the last, now call the next one */
219 module = ac->part_req[ac->finished_requests].module;
220 nreq = ac->part_req[ac->finished_requests].req;
222 ret = partition_request(module, nreq);
223 if (ret != LDB_SUCCESS) {
224 talloc_free(ares);
225 return ldb_module_done(ac->req, NULL, NULL, ret);
228 break;
231 talloc_free(ares);
232 return LDB_SUCCESS;
235 static int partition_prep_request(struct partition_context *ac,
236 struct dsdb_partition *partition)
238 int ret;
239 struct ldb_request *req;
240 struct ldb_control *partition_ctrl = NULL;
241 void *part_data = NULL;
243 ac->part_req = talloc_realloc(ac, ac->part_req,
244 struct part_request,
245 ac->num_requests + 1);
246 if (ac->part_req == NULL) {
247 return ldb_oom(ldb_module_get_ctx(ac->module));
250 switch (ac->req->operation) {
251 case LDB_SEARCH:
252 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
253 ac->part_req,
254 ac->req->op.search.base,
255 ac->req->op.search.scope,
256 ac->req->op.search.tree,
257 ac->req->op.search.attrs,
258 ac->req->controls,
259 ac, partition_req_callback,
260 ac->req);
261 LDB_REQ_SET_LOCATION(req);
262 break;
263 case LDB_ADD:
264 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
265 ac->req->op.add.message,
266 ac->req->controls,
267 ac, partition_req_callback,
268 ac->req);
269 LDB_REQ_SET_LOCATION(req);
270 break;
271 case LDB_MODIFY:
272 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273 ac->req->op.mod.message,
274 ac->req->controls,
275 ac, partition_req_callback,
276 ac->req);
277 LDB_REQ_SET_LOCATION(req);
278 break;
279 case LDB_DELETE:
280 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
281 ac->req->op.del.dn,
282 ac->req->controls,
283 ac, partition_req_callback,
284 ac->req);
285 LDB_REQ_SET_LOCATION(req);
286 break;
287 case LDB_RENAME:
288 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
289 ac->req->op.rename.olddn,
290 ac->req->op.rename.newdn,
291 ac->req->controls,
292 ac, partition_req_callback,
293 ac->req);
294 LDB_REQ_SET_LOCATION(req);
295 break;
296 case LDB_EXTENDED:
297 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
298 ac->part_req,
299 ac->req->op.extended.oid,
300 ac->req->op.extended.data,
301 ac->req->controls,
302 ac, partition_req_callback,
303 ac->req);
304 LDB_REQ_SET_LOCATION(req);
305 break;
306 default:
307 ldb_set_errstring(ldb_module_get_ctx(ac->module),
308 "Unsupported request type!");
309 ret = LDB_ERR_UNWILLING_TO_PERFORM;
312 if (ret != LDB_SUCCESS) {
313 return ret;
316 ac->part_req[ac->num_requests].req = req;
318 if (ac->req->controls) {
319 /* Duplicate everything beside the current partition control */
320 partition_ctrl = ldb_request_get_control(ac->req,
321 DSDB_CONTROL_CURRENT_PARTITION_OID);
322 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
323 return ldb_module_oom(ac->module);
327 part_data = partition->ctrl;
329 ac->part_req[ac->num_requests].module = partition->module;
331 if (partition_ctrl != NULL) {
332 if (partition_ctrl->data != NULL) {
333 part_data = partition_ctrl->data;
337 * If the provided current partition control is without
338 * data then use the calculated one.
340 ret = ldb_request_add_control(req,
341 DSDB_CONTROL_CURRENT_PARTITION_OID,
342 false, part_data);
343 if (ret != LDB_SUCCESS) {
344 return ret;
348 if (req->operation == LDB_SEARCH) {
350 * If the search is for 'more' than this partition,
351 * then change the basedn, so the check of the BASE DN
352 * still passes in the ldb_key_value layer
354 if (ldb_dn_compare_base(partition->ctrl->dn,
355 req->op.search.base) != 0) {
356 req->op.search.base = partition->ctrl->dn;
360 ac->num_requests++;
362 return LDB_SUCCESS;
365 static int partition_call_first(struct partition_context *ac)
367 return partition_request(ac->part_req[0].module, ac->part_req[0].req);
371 * Send a request down to all the partitions (but not the sam.ldb file)
373 static int partition_send_all(struct ldb_module *module,
374 struct partition_context *ac,
375 struct ldb_request *req)
377 unsigned int i;
378 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
379 struct partition_private_data);
380 int ret;
382 for (i=0; data && data->partitions && data->partitions[i]; i++) {
383 ret = partition_prep_request(ac, data->partitions[i]);
384 if (ret != LDB_SUCCESS) {
385 return ret;
389 /* fire the first one */
390 return partition_call_first(ac);
393 struct partition_copy_context {
394 struct ldb_module *module;
395 struct partition_context *partition_context;
396 struct ldb_request *request;
397 struct ldb_dn *dn;
401 * A special DN has been updated in the primary partition. Now propagate those
402 * changes to the remaining partitions.
404 * Note: that the operations are asynchronous and this function is called
405 * from partition_copy_all_callback_handler in response to an async
406 * callback.
408 static int partition_copy_all_callback_action(
409 struct ldb_module *module,
410 struct partition_context *ac,
411 struct ldb_request *req,
412 struct ldb_dn *dn)
416 unsigned int i;
417 struct partition_private_data *data =
418 talloc_get_type(
419 ldb_module_get_private(module),
420 struct partition_private_data);
421 int search_ret;
422 struct ldb_result *res;
423 /* now fetch the resulting object, and then copy it to all the
424 * other partitions. We need this approach to cope with the
425 * partitions getting out of sync. If for example the
426 * @ATTRIBUTES object exists on one partition but not the
427 * others then just doing each of the partitions in turn will
428 * lead to an error
430 search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
431 if (search_ret != LDB_SUCCESS && search_ret != LDB_ERR_NO_SUCH_OBJECT) {
432 return search_ret;
435 /* now delete the object in the other partitions, if required
437 if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
438 for (i=0; data->partitions && data->partitions[i]; i++) {
439 int pret;
440 pret = dsdb_module_del(data->partitions[i]->module,
442 DSDB_FLAG_NEXT_MODULE,
443 req);
444 if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
445 /* we should only get success or no
446 such object from the other partitions */
447 return pret;
451 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
454 /* now add/modify in the other partitions */
455 for (i=0; data->partitions && data->partitions[i]; i++) {
456 struct ldb_message *modify_msg = NULL;
457 int pret;
458 unsigned int el_idx;
460 pret = dsdb_module_add(data->partitions[i]->module,
461 res->msgs[0],
462 DSDB_FLAG_NEXT_MODULE,
463 req);
464 if (pret == LDB_SUCCESS) {
465 continue;
468 if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
469 return pret;
472 modify_msg = ldb_msg_copy(req, res->msgs[0]);
473 if (modify_msg == NULL) {
474 return ldb_module_oom(module);
478 * mark all the message elements as
479 * LDB_FLAG_MOD_REPLACE
481 for (el_idx=0;
482 el_idx < modify_msg->num_elements;
483 el_idx++) {
484 modify_msg->elements[el_idx].flags
485 = LDB_FLAG_MOD_REPLACE;
488 if (req->operation == LDB_MODIFY) {
489 const struct ldb_message *req_msg = req->op.mod.message;
491 * mark elements to be removed, if these were
492 * deleted entirely above we need to delete
493 * them here too
495 for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
496 if (LDB_FLAG_MOD_TYPE(req_msg->elements[el_idx].flags) == LDB_FLAG_MOD_DELETE
497 || ((LDB_FLAG_MOD_TYPE(req_msg->elements[el_idx].flags) == LDB_FLAG_MOD_REPLACE) &&
498 req_msg->elements[el_idx].num_values == 0)) {
499 if (ldb_msg_find_element(modify_msg,
500 req_msg->elements[el_idx].name) != NULL) {
501 continue;
503 pret = ldb_msg_add_empty(
504 modify_msg,
505 req_msg->elements[el_idx].name,
506 LDB_FLAG_MOD_REPLACE,
507 NULL);
508 if (pret != LDB_SUCCESS) {
509 return pret;
515 pret = dsdb_module_modify(data->partitions[i]->module,
516 modify_msg,
517 DSDB_FLAG_NEXT_MODULE,
518 req);
520 if (pret != LDB_SUCCESS) {
521 return pret;
525 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
530 * @brief call back function for the ldb operations on special DN's.
532 * As the LDB operations are async, and we wish to use the result
533 * the operations, a callback needs to be registered to process the results
534 * of the LDB operations.
536 * @param req the ldb request
537 * @param res the result of the operation
539 * @return the LDB_STATUS
541 static int partition_copy_all_callback_handler(
542 struct ldb_request *req,
543 struct ldb_reply *ares)
545 struct partition_copy_context *ac = NULL;
547 ac = talloc_get_type(
548 req->context,
549 struct partition_copy_context);
551 if (!ares) {
552 return ldb_module_done(
553 ac->request,
554 NULL,
555 NULL,
556 LDB_ERR_OPERATIONS_ERROR);
559 /* pass on to the callback */
560 switch (ares->type) {
561 case LDB_REPLY_ENTRY:
562 return ldb_module_send_entry(
563 ac->request,
564 ares->message,
565 ares->controls);
567 case LDB_REPLY_REFERRAL:
568 return ldb_module_send_referral(
569 ac->request,
570 ares->referral);
572 case LDB_REPLY_DONE: {
573 int error = ares->error;
574 if (error == LDB_SUCCESS) {
575 error = partition_copy_all_callback_action(
576 ac->module,
577 ac->partition_context,
578 ac->request,
579 ac->dn);
581 return ldb_module_done(
582 ac->request,
583 ares->controls,
584 ares->response,
585 error);
588 default:
589 /* Can't happen */
590 return LDB_ERR_OPERATIONS_ERROR;
595 * send an operation to the top partition, then copy the resulting
596 * object to all other partitions.
598 static int partition_copy_all(
599 struct ldb_module *module,
600 struct partition_context *partition_context,
601 struct ldb_request *req,
602 struct ldb_dn *dn)
604 struct ldb_request *new_req = NULL;
605 struct ldb_context *ldb = NULL;
606 struct partition_copy_context *context = NULL;
608 int ret;
610 ldb = ldb_module_get_ctx(module);
612 context = talloc_zero(req, struct partition_copy_context);
613 if (context == NULL) {
614 return ldb_oom(ldb);
616 context->module = module;
617 context->request = req;
618 context->dn = dn;
619 context->partition_context = partition_context;
621 switch (req->operation) {
622 case LDB_ADD:
623 ret = ldb_build_add_req(
624 &new_req,
625 ldb,
626 req,
627 req->op.add.message,
628 req->controls,
629 context,
630 partition_copy_all_callback_handler,
631 req);
632 break;
633 case LDB_MODIFY:
634 ret = ldb_build_mod_req(
635 &new_req,
636 ldb,
637 req,
638 req->op.mod.message,
639 req->controls,
640 context,
641 partition_copy_all_callback_handler,
642 req);
643 break;
644 case LDB_DELETE:
645 ret = ldb_build_del_req(
646 &new_req,
647 ldb,
648 req,
649 req->op.del.dn,
650 req->controls,
651 context,
652 partition_copy_all_callback_handler,
653 req);
654 break;
655 case LDB_RENAME:
656 ret = ldb_build_rename_req(
657 &new_req,
658 ldb,
659 req,
660 req->op.rename.olddn,
661 req->op.rename.newdn,
662 req->controls,
663 context,
664 partition_copy_all_callback_handler,
665 req);
666 break;
667 default:
669 * Shouldn't happen.
671 ldb_debug(
672 ldb,
673 LDB_DEBUG_ERROR,
674 "Unexpected operation type (%d)\n", req->operation);
675 ret = LDB_ERR_OPERATIONS_ERROR;
676 break;
678 if (ret != LDB_SUCCESS) {
679 return ret;
681 return ldb_next_request(module, new_req);
684 * Figure out which backend a request needs to be aimed at. Some
685 * requests must be replicated to all backends
687 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
689 struct partition_context *ac;
690 unsigned int i;
691 int ret;
692 struct dsdb_partition *partition;
693 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
694 struct partition_private_data);
696 /* if we aren't initialised yet go further */
697 if (!data || !data->partitions) {
698 return ldb_next_request(module, req);
701 if (ldb_dn_is_special(dn)) {
702 /* Is this a special DN, we need to replicate to every backend? */
703 for (i=0; data->replicate && data->replicate[i]; i++) {
704 if (ldb_dn_compare(data->replicate[i],
705 dn) == 0) {
707 ac = partition_init_ctx(module, req);
708 if (!ac) {
709 return ldb_operr(ldb_module_get_ctx(module));
712 return partition_copy_all(module, ac, req, dn);
717 /* Otherwise, we need to find the partition to fire it to */
719 /* Find partition */
720 partition = find_partition(data, dn, req);
721 if (!partition) {
723 * if we haven't found a matching partition
724 * pass the request to the main ldb
726 * TODO: we should maybe return an error here
727 * if it's not a special dn
730 return ldb_next_request(module, req);
733 ac = partition_init_ctx(module, req);
734 if (!ac) {
735 return ldb_operr(ldb_module_get_ctx(module));
738 /* we need to add a control but we never touch the original request */
739 ret = partition_prep_request(ac, partition);
740 if (ret != LDB_SUCCESS) {
741 return ret;
744 /* fire the first one */
745 return partition_call_first(ac);
748 /* search */
749 static int partition_search(struct ldb_module *module, struct ldb_request *req)
751 /* Find backend */
752 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
753 struct partition_private_data);
754 struct partition_context *ac;
755 struct ldb_context *ldb;
756 struct loadparm_context *lp_ctx;
758 struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
759 struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
760 struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
762 struct ldb_search_options_control *search_options = NULL;
763 struct dsdb_partition *p;
764 unsigned int i, j;
765 int ret;
766 bool domain_scope = false, phantom_root = false;
768 p = find_partition(data, NULL, req);
769 if (p != NULL) {
770 /* the caller specified what partition they want the
771 * search - just pass it on
773 return ldb_next_request(p->module, req);
776 /* Get back the search options from the search control, and mark it as
777 * non-critical (to make backends and also dcpromo happy).
779 if (search_control) {
780 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
781 search_control->critical = 0;
785 /* if we aren't initialised yet go further */
786 if (!data || !data->partitions) {
787 return ldb_next_request(module, req);
790 /* Special DNs without specified partition should go further */
791 if (ldb_dn_is_special(req->op.search.base)) {
792 return ldb_next_request(module, req);
795 /* Locate the options */
796 domain_scope = (search_options
797 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
798 || domain_scope_control;
799 phantom_root = search_options
800 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
802 /* Remove handled options from the search control flag */
803 if (search_options) {
804 search_options->search_options = search_options->search_options
805 & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
806 & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
809 ac = partition_init_ctx(module, req);
810 if (!ac) {
811 return ldb_operr(ldb_module_get_ctx(module));
814 ldb = ldb_module_get_ctx(ac->module);
815 lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
816 struct loadparm_context);
818 /* Search from the base DN */
819 if (ldb_dn_is_null(req->op.search.base)) {
820 if (!phantom_root) {
821 return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
823 return partition_send_all(module, ac, req);
826 for (i=0; data->partitions[i]; i++) {
827 bool match = false, stop = false;
829 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
830 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
831 req->op.search.base) == 0) {
832 /* base DN is in a partial replica
833 with the NO_GLOBAL_CATALOG
834 control. This partition is invisible */
835 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
836 continue;
840 if (phantom_root) {
841 /* Phantom root: Find all partitions under the
842 * search base. We match if:
844 * 1) the DN we are looking for exactly matches a
845 * certain partition and always stop
846 * 2) the DN we are looking for is a parent of certain
847 * partitions and it isn't a scope base search
848 * 3) the DN we are looking for is a child of a certain
849 * partition and always stop
850 * - we don't need to go any further up in the
851 * hierarchy!
853 if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
854 req->op.search.base) == 0) {
855 match = true;
856 stop = true;
858 if (!match &&
859 (ldb_dn_compare_base(req->op.search.base,
860 data->partitions[i]->ctrl->dn) == 0 &&
861 req->op.search.scope != LDB_SCOPE_BASE)) {
862 match = true;
864 if (!match &&
865 ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
866 req->op.search.base) == 0) {
867 match = true;
868 stop = true; /* note that this relies on partition ordering */
870 } else {
871 /* Domain scope: Find all partitions under the search
872 * base.
874 * We generate referral candidates if we haven't
875 * specified the domain scope control, haven't a base
876 * search* scope and the DN we are looking for is a real
877 * predecessor of certain partitions. When a new
878 * referral candidate is nearer to the DN than an
879 * existing one delete the latter (we want to have only
880 * the closest ones). When we checked this for all
881 * candidates we have the final referrals.
883 * We match if the DN we are looking for is a child of
884 * a certain partition or the partition
885 * DN itself - we don't need to go any further
886 * up in the hierarchy!
888 if ((!domain_scope) &&
889 (req->op.search.scope != LDB_SCOPE_BASE) &&
890 (ldb_dn_compare_base(req->op.search.base,
891 data->partitions[i]->ctrl->dn) == 0) &&
892 (ldb_dn_compare(req->op.search.base,
893 data->partitions[i]->ctrl->dn) != 0)) {
894 const char *scheme = ldb_get_opaque(
895 ldb, LDAP_REFERRAL_SCHEME_OPAQUE);
896 char *ref = talloc_asprintf(
898 "%s://%s/%s%s",
899 scheme == NULL ? "ldap" : scheme,
900 lpcfg_dnsdomain(lp_ctx),
901 ldb_dn_get_linearized(
902 data->partitions[i]->ctrl->dn),
903 req->op.search.scope ==
904 LDB_SCOPE_ONELEVEL ? "??base" : "");
906 if (ref == NULL) {
907 return ldb_oom(ldb);
910 /* Initialise the referrals list */
911 if (ac->referrals == NULL) {
912 char **l = str_list_make_empty(ac);
913 ac->referrals = discard_const_p(const char *, l);
914 if (ac->referrals == NULL) {
915 return ldb_oom(ldb);
919 /* Check if the new referral candidate is
920 * closer to the base DN than already
921 * saved ones and delete the latters */
922 j = 0;
923 while (ac->referrals[j] != NULL) {
924 if (strstr(ac->referrals[j],
925 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
926 str_list_remove(ac->referrals,
927 ac->referrals[j]);
928 } else {
929 ++j;
933 /* Add our new candidate */
934 ac->referrals = str_list_add(ac->referrals, ref);
936 talloc_free(ref);
938 if (ac->referrals == NULL) {
939 return ldb_oom(ldb);
942 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
943 match = true;
944 stop = true; /* note that this relies on partition ordering */
948 if (match) {
949 ret = partition_prep_request(ac, data->partitions[i]);
950 if (ret != LDB_SUCCESS) {
951 return ret;
955 if (stop) break;
958 /* Perhaps we didn't match any partitions. Try the main partition */
959 if (ac->num_requests == 0) {
960 talloc_free(ac);
961 return ldb_next_request(module, req);
964 /* fire the first one */
965 return partition_call_first(ac);
968 /* add */
969 static int partition_add(struct ldb_module *module, struct ldb_request *req)
971 return partition_replicate(module, req, req->op.add.message->dn);
974 /* modify */
975 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
977 return partition_replicate(module, req, req->op.mod.message->dn);
980 /* delete */
981 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
983 return partition_replicate(module, req, req->op.del.dn);
986 /* rename */
987 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
989 /* Find backend */
990 struct dsdb_partition *backend, *backend2;
992 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
993 struct partition_private_data);
995 /* Skip the lot if 'data' isn't here yet (initialisation) */
996 if (!data) {
997 return ldb_operr(ldb_module_get_ctx(module));
1000 backend = find_partition(data, req->op.rename.olddn, req);
1001 backend2 = find_partition(data, req->op.rename.newdn, req);
1003 if ((backend && !backend2) || (!backend && backend2)) {
1004 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1007 if (backend != backend2) {
1008 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1009 "Cannot rename from %s in %s to %s in %s: %s",
1010 ldb_dn_get_linearized(req->op.rename.olddn),
1011 ldb_dn_get_linearized(backend->ctrl->dn),
1012 ldb_dn_get_linearized(req->op.rename.newdn),
1013 ldb_dn_get_linearized(backend2->ctrl->dn),
1014 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
1015 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1018 return partition_replicate(module, req, req->op.rename.olddn);
1021 /* start a transaction */
1022 int partition_start_trans(struct ldb_module *module)
1024 int i = 0;
1025 int ret = 0;
1026 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1027 struct partition_private_data);
1028 /* Look at base DN */
1029 /* Figure out which partition it is under */
1030 /* Skip the lot if 'data' isn't here yet (initialization) */
1031 if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
1032 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
1036 * We start a transaction on metadata.tdb first and end it last in
1037 * end_trans. This makes locking semantics follow TDB rather than MDB,
1038 * and effectively locks all partitions at once.
1039 * Detail:
1040 * Samba AD is special in that the partitions module (this file)
1041 * combines multiple independently locked databases into one overall
1042 * transaction. Changes across multiple partition DBs in a single
1043 * transaction must ALL be either visible or invisible.
1044 * The way this is achieved is by taking out a write lock on
1045 * metadata.tdb at the start of prepare_commit, while unlocking it at
1046 * the end of end_trans. This is matched by read_lock, ensuring it
1047 * can't progress until that write lock is released.
1049 * metadata.tdb needs to be a TDB file because MDB uses independent
1050 * locks, which means a read lock and a write lock can be held at the
1051 * same time, whereas in TDB, the two locks block each other. The TDB
1052 * behaviour is required to implement the functionality described
1053 * above.
1055 * An important additional detail here is that if prepare_commit is
1056 * called on a TDB without any changes being made, no write lock is
1057 * taken. We address this by storing a sequence number in metadata.tdb
1058 * which is updated every time a replicated attribute is modified.
1059 * The possibility of a few unreplicated attributes being out of date
1060 * turns out not to be a problem.
1061 * For this reason, a lock on sam.ldb (which is a TDB) won't achieve
1062 * the same end as locking metadata.tdb, unless we made a modification
1063 * to the @ records found there before every prepare_commit.
1065 ret = partition_metadata_start_trans(module);
1066 if (ret != LDB_SUCCESS) {
1067 return ret;
1070 ret = ldb_next_start_trans(module);
1071 if (ret != LDB_SUCCESS) {
1072 partition_metadata_del_trans(module);
1073 return ret;
1076 ret = partition_reload_if_required(module, data, NULL);
1077 if (ret != LDB_SUCCESS) {
1078 ldb_next_del_trans(module);
1079 partition_metadata_del_trans(module);
1080 return ret;
1084 * The following per partition locks are required mostly because TDB
1085 * and MDB require locks before read and write ops are permitted.
1087 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1088 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1089 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
1090 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1092 ret = ldb_next_start_trans(data->partitions[i]->module);
1093 if (ret != LDB_SUCCESS) {
1094 /* Back it out, if it fails on one */
1095 for (i--; i >= 0; i--) {
1096 ldb_next_del_trans(data->partitions[i]->module);
1098 ldb_next_del_trans(module);
1099 partition_metadata_del_trans(module);
1100 return ret;
1104 data->in_transaction++;
1106 return LDB_SUCCESS;
1109 /* prepare for a commit */
1110 int partition_prepare_commit(struct ldb_module *module)
1112 unsigned int i;
1113 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1114 struct partition_private_data);
1115 int ret;
1118 * Order of prepare_commit calls must match that in
1119 * partition_start_trans. See comment in that function for detail.
1121 ret = partition_metadata_prepare_commit(module);
1122 if (ret != LDB_SUCCESS) {
1123 return ret;
1126 ret = ldb_next_prepare_commit(module);
1127 if (ret != LDB_SUCCESS) {
1128 return ret;
1131 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1132 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1133 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
1134 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1136 ret = ldb_next_prepare_commit(data->partitions[i]->module);
1137 if (ret != LDB_SUCCESS) {
1138 ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
1139 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1140 ldb_errstring(ldb_module_get_ctx(module)));
1141 return ret;
1145 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1146 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
1149 return LDB_SUCCESS;
1153 /* end a transaction */
1154 int partition_end_trans(struct ldb_module *module)
1156 int ret, ret2;
1157 int i;
1158 struct ldb_context *ldb = ldb_module_get_ctx(module);
1159 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1160 struct partition_private_data);
1161 bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1163 ret = LDB_SUCCESS;
1165 if (data->in_transaction == 0) {
1166 DEBUG(0,("partition end transaction mismatch\n"));
1167 ret = LDB_ERR_OPERATIONS_ERROR;
1168 } else {
1169 data->in_transaction--;
1173 * Order of end_trans calls must be the reverse of that in
1174 * partition_start_trans. See comment in that function for detail.
1176 if (data && data->partitions) {
1177 /* Just counting the partitions */
1178 for (i=0; data->partitions[i]; i++) {}
1180 /* now walk them backwards */
1181 for (i--; i>=0; i--) {
1182 struct dsdb_partition *p = data->partitions[i];
1183 if (trace) {
1184 ldb_debug(ldb,
1185 LDB_DEBUG_TRACE,
1186 "partition_end_trans() -> %s",
1187 ldb_dn_get_linearized(p->ctrl->dn));
1189 ret2 = ldb_next_end_trans(p->module);
1190 if (ret2 != LDB_SUCCESS) {
1191 ldb_asprintf_errstring(ldb,
1192 "end_trans error on %s: %s",
1193 ldb_dn_get_linearized(p->ctrl->dn),
1194 ldb_errstring(ldb));
1195 ret = ret2;
1200 if (trace) {
1201 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
1203 ret2 = ldb_next_end_trans(module);
1204 if (ret2 != LDB_SUCCESS) {
1205 ret = ret2;
1208 ret2 = partition_metadata_end_trans(module);
1209 if (ret2 != LDB_SUCCESS) {
1210 ret = ret2;
1213 return ret;
1216 /* delete a transaction */
1217 int partition_del_trans(struct ldb_module *module)
1219 int ret, final_ret = LDB_SUCCESS;
1220 int i;
1221 struct ldb_context *ldb = ldb_module_get_ctx(module);
1222 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1223 struct partition_private_data);
1224 bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1226 if (data == NULL) {
1227 DEBUG(0,("partition delete transaction with no private data\n"));
1228 return ldb_operr(ldb);
1232 * Order of del_trans calls must be the reverse of that in
1233 * partition_start_trans. See comment in that function for detail.
1235 if (data->partitions) {
1236 /* Just counting the partitions */
1237 for (i=0; data->partitions[i]; i++) {}
1239 /* now walk them backwards */
1240 for (i--; i>=0; i--) {
1241 struct dsdb_partition *p = data->partitions[i];
1242 if (trace) {
1243 ldb_debug(ldb,
1244 LDB_DEBUG_TRACE,
1245 "partition_del_trans() -> %s",
1246 ldb_dn_get_linearized(p->ctrl->dn));
1248 ret = ldb_next_del_trans(p->module);
1249 if (ret != LDB_SUCCESS) {
1250 ldb_asprintf_errstring(ldb,
1251 "del_trans error on %s: %s",
1252 ldb_dn_get_linearized(p->ctrl->dn),
1253 ldb_errstring(ldb));
1254 final_ret = ret;
1259 if (trace) {
1260 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1262 ret = ldb_next_del_trans(module);
1263 if (ret != LDB_SUCCESS) {
1264 final_ret = ret;
1267 ret = partition_metadata_del_trans(module);
1268 if (ret != LDB_SUCCESS) {
1269 final_ret = ret;
1272 if (data->in_transaction == 0) {
1273 DEBUG(0,("partition del transaction mismatch\n"));
1274 return ldb_operr(ldb_module_get_ctx(module));
1276 data->in_transaction--;
1278 return final_ret;
1281 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
1282 uint64_t *seq_number,
1283 struct ldb_request *parent)
1285 int ret;
1286 struct ldb_result *res;
1287 struct ldb_seqnum_request *tseq;
1288 struct ldb_seqnum_result *seqr;
1290 tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1291 if (tseq == NULL) {
1292 return ldb_oom(ldb_module_get_ctx(module));
1294 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1296 ret = dsdb_module_extended(module, tseq, &res,
1297 LDB_EXTENDED_SEQUENCE_NUMBER,
1298 tseq,
1299 DSDB_FLAG_NEXT_MODULE,
1300 parent);
1301 if (ret != LDB_SUCCESS) {
1302 talloc_free(tseq);
1303 return ret;
1306 seqr = talloc_get_type_abort(res->extended->data,
1307 struct ldb_seqnum_result);
1308 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1309 talloc_free(res);
1310 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1311 "Primary backend in partition module returned a timestamp based seq");
1314 *seq_number = seqr->seq_num;
1315 talloc_free(tseq);
1316 return LDB_SUCCESS;
1321 * Older version of sequence number as sum of sequence numbers for each partition
1323 int partition_sequence_number_from_partitions(struct ldb_module *module,
1324 uint64_t *seqr)
1326 int ret;
1327 unsigned int i;
1328 uint64_t seq_number = 0;
1329 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1330 struct partition_private_data);
1332 ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1333 if (ret != LDB_SUCCESS) {
1334 return ret;
1337 /* Skip the lot if 'data' isn't here yet (initialisation) */
1338 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1339 struct ldb_seqnum_request *tseq;
1340 struct ldb_seqnum_result *tseqr;
1341 struct ldb_request *treq;
1342 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1343 if (res == NULL) {
1344 return ldb_oom(ldb_module_get_ctx(module));
1346 tseq = talloc_zero(res, struct ldb_seqnum_request);
1347 if (tseq == NULL) {
1348 talloc_free(res);
1349 return ldb_oom(ldb_module_get_ctx(module));
1351 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1353 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1354 LDB_EXTENDED_SEQUENCE_NUMBER,
1355 tseq,
1356 NULL,
1357 res,
1358 ldb_extended_default_callback,
1359 NULL);
1360 LDB_REQ_SET_LOCATION(treq);
1361 if (ret != LDB_SUCCESS) {
1362 talloc_free(res);
1363 return ret;
1366 ret = partition_request(data->partitions[i]->module, treq);
1367 if (ret != LDB_SUCCESS) {
1368 talloc_free(res);
1369 return ret;
1371 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1372 if (ret != LDB_SUCCESS) {
1373 talloc_free(res);
1374 return ret;
1376 tseqr = talloc_get_type(res->extended->data,
1377 struct ldb_seqnum_result);
1378 seq_number += tseqr->seq_num;
1379 talloc_free(res);
1382 *seqr = seq_number;
1383 return LDB_SUCCESS;
1388 * Newer version of sequence number using metadata tdb
1390 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1392 struct ldb_extended *ext;
1393 struct ldb_seqnum_request *seq;
1394 struct ldb_seqnum_result *seqr;
1395 uint64_t seq_number;
1396 int ret;
1398 seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1399 switch (seq->type) {
1400 case LDB_SEQ_NEXT:
1401 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1402 if (ret != LDB_SUCCESS) {
1403 return ret;
1405 break;
1407 case LDB_SEQ_HIGHEST_SEQ:
1408 ret = partition_metadata_sequence_number(module, &seq_number);
1409 if (ret != LDB_SUCCESS) {
1410 return ret;
1412 break;
1414 case LDB_SEQ_HIGHEST_TIMESTAMP:
1415 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1416 "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1419 ext = talloc_zero(req, struct ldb_extended);
1420 if (!ext) {
1421 return ldb_module_oom(module);
1423 seqr = talloc_zero(ext, struct ldb_seqnum_result);
1424 if (seqr == NULL) {
1425 talloc_free(ext);
1426 return ldb_module_oom(module);
1428 ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1429 ext->data = seqr;
1431 seqr->seq_num = seq_number;
1432 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1434 /* send request done */
1435 return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1438 /* lock all the backends */
1439 int partition_read_lock(struct ldb_module *module)
1441 int i = 0;
1442 int ret = 0;
1443 int ret2 = 0;
1444 struct ldb_context *ldb = ldb_module_get_ctx(module);
1445 struct partition_private_data *data = \
1446 talloc_get_type(ldb_module_get_private(module),
1447 struct partition_private_data);
1449 if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1450 ldb_debug(ldb, LDB_DEBUG_TRACE,
1451 "partition_read_lock() -> (metadata partition)");
1455 * It is important to only do this for LOCK because:
1456 * - we don't want to unlock what we did not lock
1458 * - we don't want to make a new lock on the sam.ldb
1459 * (triggered inside this routine due to the seq num check)
1460 * during an unlock phase as that will violate the lock
1461 * ordering
1464 if (data == NULL) {
1465 TALLOC_CTX *mem_ctx = talloc_new(module);
1467 data = talloc_zero(mem_ctx, struct partition_private_data);
1468 if (data == NULL) {
1469 talloc_free(mem_ctx);
1470 return ldb_operr(ldb);
1474 * When used from Samba4, this message is set by the
1475 * samba4 module, as a fixed value not read from the
1476 * DB. This avoids listing modules in the DB
1478 data->forced_module_msg = talloc_get_type(
1479 ldb_get_opaque(ldb,
1480 DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1481 struct ldb_message);
1483 ldb_module_set_private(module, talloc_steal(module,
1484 data));
1485 talloc_free(mem_ctx);
1489 * This will lock sam.ldb and will also call event loops,
1490 * so we do it before we get the whole db lock.
1492 ret = partition_reload_if_required(module, data, NULL);
1493 if (ret != LDB_SUCCESS) {
1494 return ret;
1498 * Order of read_lock calls must match that in partition_start_trans.
1499 * See comment in that function for detail.
1501 ret = partition_metadata_read_lock(module);
1502 if (ret != LDB_SUCCESS) {
1503 goto failed;
1507 * The top level DB (sam.ldb) lock is not enough to block another
1508 * process in prepare_commit(), because if nothing was changed in the
1509 * specific backend, then prepare_commit() is a no-op. Therefore the
1510 * metadata.tdb lock is taken out above, as it is the best we can do
1511 * right now.
1513 ret = ldb_next_read_lock(module);
1514 if (ret != LDB_SUCCESS) {
1515 ldb_debug_set(ldb,
1516 LDB_DEBUG_FATAL,
1517 "Failed to lock db: %s / %s for metadata partition",
1518 ldb_errstring(ldb),
1519 ldb_strerror(ret));
1521 return ret;
1525 * The following per partition locks are required mostly because TDB
1526 * and MDB require locks before reads are permitted.
1528 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1529 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1530 ldb_debug(ldb, LDB_DEBUG_TRACE,
1531 "partition_read_lock() -> %s",
1532 ldb_dn_get_linearized(
1533 data->partitions[i]->ctrl->dn));
1535 ret = ldb_next_read_lock(data->partitions[i]->module);
1536 if (ret == LDB_SUCCESS) {
1537 continue;
1540 ldb_debug_set(ldb,
1541 LDB_DEBUG_FATAL,
1542 "Failed to lock db: %s / %s for %s",
1543 ldb_errstring(ldb),
1544 ldb_strerror(ret),
1545 ldb_dn_get_linearized(
1546 data->partitions[i]->ctrl->dn));
1548 goto failed;
1551 return LDB_SUCCESS;
1553 failed:
1554 /* Back it out, if it fails on one */
1555 for (i--; i >= 0; i--) {
1556 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1557 if (ret2 != LDB_SUCCESS) {
1558 ldb_debug(ldb,
1559 LDB_DEBUG_FATAL,
1560 "Failed to unlock db: %s / %s",
1561 ldb_errstring(ldb),
1562 ldb_strerror(ret2));
1565 ret2 = ldb_next_read_unlock(module);
1566 if (ret2 != LDB_SUCCESS) {
1567 ldb_debug(ldb,
1568 LDB_DEBUG_FATAL,
1569 "Failed to unlock db: %s / %s",
1570 ldb_errstring(ldb),
1571 ldb_strerror(ret2));
1573 return ret;
1576 /* unlock all the backends */
1577 int partition_read_unlock(struct ldb_module *module)
1579 int i;
1580 int ret = LDB_SUCCESS;
1581 int ret2;
1582 struct ldb_context *ldb = ldb_module_get_ctx(module);
1583 struct partition_private_data *data = \
1584 talloc_get_type(ldb_module_get_private(module),
1585 struct partition_private_data);
1586 bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1589 * Order of read_unlock calls must be the reverse of that in
1590 * partition_start_trans. See comment in that function for detail.
1592 if (data && data->partitions) {
1593 /* Just counting the partitions */
1594 for (i=0; data->partitions[i]; i++) {}
1596 /* now walk them backwards */
1597 for (i--; i>=0; i--) {
1598 struct dsdb_partition *p = data->partitions[i];
1599 if (trace) {
1600 ldb_debug(ldb, LDB_DEBUG_TRACE,
1601 "partition_read_unlock() -> %s",
1602 ldb_dn_get_linearized(p->ctrl->dn));
1604 ret2 = ldb_next_read_unlock(p->module);
1605 if (ret2 != LDB_SUCCESS) {
1606 ldb_debug_set(ldb,
1607 LDB_DEBUG_FATAL,
1608 "Failed to lock db: %s / %s for %s",
1609 ldb_errstring(ldb),
1610 ldb_strerror(ret2),
1611 ldb_dn_get_linearized(p->ctrl->dn));
1614 * Don't overwrite the original failure code
1615 * if there was one
1617 if (ret == LDB_SUCCESS) {
1618 ret = ret2;
1624 if (trace) {
1625 ldb_debug(ldb, LDB_DEBUG_TRACE,
1626 "partition_read_unlock() -> (metadata partition)");
1629 ret2 = ldb_next_read_unlock(module);
1630 if (ret2 != LDB_SUCCESS) {
1631 ldb_debug_set(ldb,
1632 LDB_DEBUG_FATAL,
1633 "Failed to unlock db: %s / %s for metadata partition",
1634 ldb_errstring(ldb),
1635 ldb_strerror(ret2));
1638 * Don't overwrite the original failure code
1639 * if there was one
1641 if (ret == LDB_SUCCESS) {
1642 ret = ret2;
1646 ret2 = partition_metadata_read_unlock(module);
1649 * Don't overwrite the original failure code
1650 * if there was one
1652 if (ret == LDB_SUCCESS) {
1653 ret = ret2;
1656 return ret;
1659 /* extended */
1660 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1662 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1663 struct partition_private_data);
1664 struct partition_context *ac;
1665 int ret;
1667 /* if we aren't initialised yet go further */
1668 if (!data) {
1669 return ldb_next_request(module, req);
1672 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1673 /* Update the metadata.tdb to increment the schema version if needed*/
1674 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1675 ret = partition_metadata_inc_schema_sequence(module);
1676 return ldb_module_done(req, NULL, NULL, ret);
1679 if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1680 return partition_sequence_number(module, req);
1683 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1684 return partition_create(module, req);
1688 * as the extended operation has no dn
1689 * we need to send it to all partitions
1692 ac = partition_init_ctx(module, req);
1693 if (!ac) {
1694 return ldb_operr(ldb_module_get_ctx(module));
1697 return partition_send_all(module, ac, req);
1700 static const struct ldb_module_ops ldb_partition_module_ops = {
1701 .name = "partition",
1702 .init_context = partition_init,
1703 .search = partition_search,
1704 .add = partition_add,
1705 .modify = partition_modify,
1706 .del = partition_delete,
1707 .rename = partition_rename,
1708 .extended = partition_extended,
1709 .start_transaction = partition_start_trans,
1710 .prepare_commit = partition_prepare_commit,
1711 .end_transaction = partition_end_trans,
1712 .del_transaction = partition_del_trans,
1713 .read_lock = partition_read_lock,
1714 .read_unlock = partition_read_unlock
1717 int ldb_partition_module_init(const char *version)
1719 LDB_MODULE_CHECK_VERSION(version);
1720 return ldb_register_module(&ldb_partition_module_ops);