s4:dsdb: Fix warnings about not set / set but unused / shadowed variables
[Samba/gebeck_regimport.git] / source4 / dsdb / samdb / ldb_modules / partition.c
blob63835d8d627fde306dcdbcf2242c653439571258
1 /*
2 Partitions ldb module
4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 * Name: ldb
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
34 struct part_request {
35 struct ldb_module *module;
36 struct ldb_request *req;
39 struct partition_context {
40 struct ldb_module *module;
41 struct ldb_request *req;
43 struct part_request *part_req;
44 unsigned int num_requests;
45 unsigned int finished_requests;
47 const char **referrals;
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
52 struct partition_context *ac;
54 ac = talloc_zero(req, struct partition_context);
55 if (ac == NULL) {
56 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57 return NULL;
60 ac->module = module;
61 ac->req = req;
63 return ac;
67 * helper functions to call the next module in chain
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
71 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72 const struct dsdb_control_current_partition *partition = NULL;
73 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74 if (partition_ctrl) {
75 partition = talloc_get_type(partition_ctrl->data,
76 struct dsdb_control_current_partition);
79 if (partition != NULL) {
80 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition->dn));
82 } else {
83 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module, request);
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91 struct ldb_dn *dn,
92 struct ldb_request *req)
94 unsigned int i;
95 struct ldb_control *partition_ctrl;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 if (partition_ctrl) {
103 const struct dsdb_control_current_partition *partition;
104 partition = talloc_get_type(partition_ctrl->data,
105 struct dsdb_control_current_partition);
106 if (partition != NULL) {
107 dn = partition->dn;
111 if (dn == NULL) {
112 return NULL;
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 return data->partitions[i];
124 return NULL;
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request *req,
131 struct ldb_reply *ares)
133 struct partition_context *ac;
134 struct ldb_module *module;
135 struct ldb_request *nreq;
136 int ret;
137 struct ldb_control *partition_ctrl;
139 ac = talloc_get_type(req->context, struct partition_context);
141 if (!ares) {
142 return ldb_module_done(ac->req, NULL, NULL,
143 LDB_ERR_OPERATIONS_ERROR);
146 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147 if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148 /* If we didn't fan this request out to mulitple partitions,
149 * or this is an individual search result, we can
150 * deterministically tell the caller what partition this was
151 * written to (repl_meta_data likes to know) */
152 ret = ldb_reply_add_control(ares,
153 DSDB_CONTROL_CURRENT_PARTITION_OID,
154 false, partition_ctrl->data);
155 if (ret != LDB_SUCCESS) {
156 return ldb_module_done(ac->req, NULL, NULL,
157 ret);
161 if (ares->error != LDB_SUCCESS) {
162 return ldb_module_done(ac->req, ares->controls,
163 ares->response, ares->error);
166 switch (ares->type) {
167 case LDB_REPLY_REFERRAL:
168 return ldb_module_send_referral(ac->req, ares->referral);
170 case LDB_REPLY_ENTRY:
171 if (ac->req->operation != LDB_SEARCH) {
172 ldb_set_errstring(ldb_module_get_ctx(ac->module),
173 "partition_req_callback:"
174 " Unsupported reply type for this request");
175 return ldb_module_done(ac->req, NULL, NULL,
176 LDB_ERR_OPERATIONS_ERROR);
179 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
181 case LDB_REPLY_DONE:
182 if (ac->req->operation == LDB_EXTENDED) {
183 /* FIXME: check for ares->response, replmd does not fill it ! */
184 if (ares->response) {
185 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186 ldb_set_errstring(ldb_module_get_ctx(ac->module),
187 "partition_req_callback:"
188 " Unknown extended reply, "
189 "only supports START_TLS");
190 talloc_free(ares);
191 return ldb_module_done(ac->req, NULL, NULL,
192 LDB_ERR_OPERATIONS_ERROR);
197 ac->finished_requests++;
198 if (ac->finished_requests == ac->num_requests) {
199 /* Send back referrals if they do exist (search ops) */
200 if (ac->referrals != NULL) {
201 const char **ref;
202 for (ref = ac->referrals; *ref != NULL; ++ref) {
203 ret = ldb_module_send_referral(ac->req,
204 talloc_strdup(ac->req, *ref));
205 if (ret != LDB_SUCCESS) {
206 return ldb_module_done(ac->req, NULL, NULL,
207 ret);
212 /* this was the last one, call callback */
213 return ldb_module_done(ac->req, ares->controls,
214 ares->response,
215 ares->error);
218 /* not the last, now call the next one */
219 module = ac->part_req[ac->finished_requests].module;
220 nreq = ac->part_req[ac->finished_requests].req;
222 ret = partition_request(module, nreq);
223 if (ret != LDB_SUCCESS) {
224 talloc_free(ares);
225 return ldb_module_done(ac->req, NULL, NULL, ret);
228 break;
231 talloc_free(ares);
232 return LDB_SUCCESS;
235 static int partition_prep_request(struct partition_context *ac,
236 struct dsdb_partition *partition)
238 int ret;
239 struct ldb_request *req;
240 struct ldb_control *partition_ctrl = NULL;
242 ac->part_req = talloc_realloc(ac, ac->part_req,
243 struct part_request,
244 ac->num_requests + 1);
245 if (ac->part_req == NULL) {
246 return ldb_oom(ldb_module_get_ctx(ac->module));
249 switch (ac->req->operation) {
250 case LDB_SEARCH:
251 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
252 ac->part_req,
253 ac->req->op.search.base,
254 ac->req->op.search.scope,
255 ac->req->op.search.tree,
256 ac->req->op.search.attrs,
257 ac->req->controls,
258 ac, partition_req_callback,
259 ac->req);
260 LDB_REQ_SET_LOCATION(req);
261 break;
262 case LDB_ADD:
263 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
264 ac->req->op.add.message,
265 ac->req->controls,
266 ac, partition_req_callback,
267 ac->req);
268 LDB_REQ_SET_LOCATION(req);
269 break;
270 case LDB_MODIFY:
271 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272 ac->req->op.mod.message,
273 ac->req->controls,
274 ac, partition_req_callback,
275 ac->req);
276 LDB_REQ_SET_LOCATION(req);
277 break;
278 case LDB_DELETE:
279 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280 ac->req->op.del.dn,
281 ac->req->controls,
282 ac, partition_req_callback,
283 ac->req);
284 LDB_REQ_SET_LOCATION(req);
285 break;
286 case LDB_RENAME:
287 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288 ac->req->op.rename.olddn,
289 ac->req->op.rename.newdn,
290 ac->req->controls,
291 ac, partition_req_callback,
292 ac->req);
293 LDB_REQ_SET_LOCATION(req);
294 break;
295 case LDB_EXTENDED:
296 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
297 ac->part_req,
298 ac->req->op.extended.oid,
299 ac->req->op.extended.data,
300 ac->req->controls,
301 ac, partition_req_callback,
302 ac->req);
303 LDB_REQ_SET_LOCATION(req);
304 break;
305 default:
306 ldb_set_errstring(ldb_module_get_ctx(ac->module),
307 "Unsupported request type!");
308 ret = LDB_ERR_UNWILLING_TO_PERFORM;
311 if (ret != LDB_SUCCESS) {
312 return ret;
315 ac->part_req[ac->num_requests].req = req;
317 if (ac->req->controls) {
318 /* Duplicate everything beside the current partition control */
319 partition_ctrl = ldb_request_get_control(ac->req,
320 DSDB_CONTROL_CURRENT_PARTITION_OID);
321 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
322 return ldb_module_oom(ac->module);
326 if (partition) {
327 void *part_data = partition->ctrl;
329 ac->part_req[ac->num_requests].module = partition->module;
331 if (partition_ctrl != NULL) {
332 if (partition_ctrl->data != NULL) {
333 part_data = partition_ctrl->data;
337 * If the provided current partition control is without
338 * data then use the calculated one.
340 ret = ldb_request_add_control(req,
341 DSDB_CONTROL_CURRENT_PARTITION_OID,
342 false, part_data);
343 if (ret != LDB_SUCCESS) {
344 return ret;
348 if (req->operation == LDB_SEARCH) {
349 /* If the search is for 'more' than this partition,
350 * then change the basedn, so a remote LDAP server
351 * doesn't object */
352 if (ldb_dn_compare_base(partition->ctrl->dn,
353 req->op.search.base) != 0) {
354 req->op.search.base = partition->ctrl->dn;
358 } else {
359 /* make sure you put the module here, or
360 * or ldb_next_request() will skip a module */
361 ac->part_req[ac->num_requests].module = ac->module;
364 ac->num_requests++;
366 return LDB_SUCCESS;
369 static int partition_call_first(struct partition_context *ac)
371 return partition_request(ac->part_req[0].module, ac->part_req[0].req);
375 * Send a request down to all the partitions
377 static int partition_send_all(struct ldb_module *module,
378 struct partition_context *ac,
379 struct ldb_request *req)
381 unsigned int i;
382 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
383 struct partition_private_data);
384 int ret = partition_prep_request(ac, NULL);
385 if (ret != LDB_SUCCESS) {
386 return ret;
388 for (i=0; data && data->partitions && data->partitions[i]; i++) {
389 ret = partition_prep_request(ac, data->partitions[i]);
390 if (ret != LDB_SUCCESS) {
391 return ret;
395 /* fire the first one */
396 return partition_call_first(ac);
401 * send an operation to the top partition, then copy the resulting
402 * object to all other partitions
404 static int partition_copy_all(struct ldb_module *module,
405 struct partition_context *ac,
406 struct ldb_request *req,
407 struct ldb_dn *dn)
409 unsigned int i;
410 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
411 struct partition_private_data);
412 int ret, search_ret;
413 struct ldb_result *res;
415 /* do the request on the top level sam.ldb synchronously */
416 ret = ldb_next_request(module, req);
417 if (ret != LDB_SUCCESS) {
418 return ret;
420 ret = ldb_wait(req->handle, LDB_WAIT_ALL);
421 if (ret != LDB_SUCCESS) {
422 return ret;
425 /* now fetch the resulting object, and then copy it to all the
426 * other partitions. We need this approach to cope with the
427 * partitions getting out of sync. If for example the
428 * @ATTRIBUTES object exists on one partition but not the
429 * others then just doing each of the partitions in turn will
430 * lead to an error
432 search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
433 if (search_ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
434 return search_ret;
437 /* now delete the object in the other partitions. Once that is
438 done we will re-add the object, if search_ret was not
439 LDB_ERR_NO_SUCH_OBJECT
441 for (i=0; data->partitions && data->partitions[i]; i++) {
442 int pret;
443 pret = dsdb_module_del(data->partitions[i]->module, dn, DSDB_FLAG_NEXT_MODULE, req);
444 if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
445 /* we should only get success or no
446 such object from the other partitions */
447 return pret;
452 if (search_ret != LDB_ERR_NO_SUCH_OBJECT) {
453 /* now re-add in the other partitions */
454 for (i=0; data->partitions && data->partitions[i]; i++) {
455 int pret;
456 pret = dsdb_module_add(data->partitions[i]->module, res->msgs[0], DSDB_FLAG_NEXT_MODULE, req);
457 if (pret != LDB_SUCCESS) {
458 return pret;
463 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
467 * Figure out which backend a request needs to be aimed at. Some
468 * requests must be replicated to all backends
470 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
472 struct partition_context *ac;
473 unsigned int i;
474 int ret;
475 struct dsdb_partition *partition;
476 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
477 struct partition_private_data);
479 /* if we aren't initialised yet go further */
480 if (!data || !data->partitions) {
481 return ldb_next_request(module, req);
484 if (ldb_dn_is_special(dn)) {
485 /* Is this a special DN, we need to replicate to every backend? */
486 for (i=0; data->replicate && data->replicate[i]; i++) {
487 if (ldb_dn_compare(data->replicate[i],
488 dn) == 0) {
490 ac = partition_init_ctx(module, req);
491 if (!ac) {
492 return ldb_operr(ldb_module_get_ctx(module));
495 return partition_copy_all(module, ac, req, dn);
500 /* Otherwise, we need to find the partition to fire it to */
502 /* Find partition */
503 partition = find_partition(data, dn, req);
504 if (!partition) {
506 * if we haven't found a matching partition
507 * pass the request to the main ldb
509 * TODO: we should maybe return an error here
510 * if it's not a special dn
513 return ldb_next_request(module, req);
516 ac = partition_init_ctx(module, req);
517 if (!ac) {
518 return ldb_operr(ldb_module_get_ctx(module));
521 /* we need to add a control but we never touch the original request */
522 ret = partition_prep_request(ac, partition);
523 if (ret != LDB_SUCCESS) {
524 return ret;
527 /* fire the first one */
528 return partition_call_first(ac);
531 /* search */
532 static int partition_search(struct ldb_module *module, struct ldb_request *req)
534 struct ldb_control **saved_controls;
535 /* Find backend */
536 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
537 struct partition_private_data);
538 struct partition_context *ac;
539 struct ldb_context *ldb;
540 struct loadparm_context *lp_ctx;
542 struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
543 struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
544 struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
546 struct ldb_search_options_control *search_options = NULL;
547 struct dsdb_partition *p;
548 unsigned int i, j;
549 int ret;
550 bool domain_scope = false, phantom_root = false;
552 p = find_partition(data, NULL, req);
553 if (p != NULL) {
554 /* the caller specified what partition they want the
555 * search - just pass it on
557 return ldb_next_request(p->module, req);
560 /* Get back the search options from the search control, and mark it as
561 * non-critical (to make backends and also dcpromo happy).
563 if (search_control) {
564 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
565 search_control->critical = 0;
569 /* Remove the "domain_scope" control, so we don't confuse a backend
570 * server */
571 if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
572 return ldb_oom(ldb_module_get_ctx(module));
575 /* if we aren't initialised yet go further */
576 if (!data || !data->partitions) {
577 return ldb_next_request(module, req);
580 /* Special DNs without specified partition should go further */
581 if (ldb_dn_is_special(req->op.search.base)) {
582 return ldb_next_request(module, req);
585 /* Locate the options */
586 domain_scope = (search_options
587 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
588 || domain_scope_control;
589 phantom_root = search_options
590 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
592 /* Remove handled options from the search control flag */
593 if (search_options) {
594 search_options->search_options = search_options->search_options
595 & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
596 & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
599 ac = partition_init_ctx(module, req);
600 if (!ac) {
601 return ldb_operr(ldb_module_get_ctx(module));
604 ldb = ldb_module_get_ctx(ac->module);
605 lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
606 struct loadparm_context);
608 /* Search from the base DN */
609 if (ldb_dn_is_null(req->op.search.base)) {
610 if (!phantom_root) {
611 return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
613 return partition_send_all(module, ac, req);
616 for (i=0; data->partitions[i]; i++) {
617 bool match = false, stop = false;
619 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
620 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
621 req->op.search.base) == 0) {
622 /* base DN is in a partial replica
623 with the NO_GLOBAL_CATALOG
624 control. This partition is invisible */
625 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
626 continue;
630 if (phantom_root) {
631 /* Phantom root: Find all partitions under the
632 * search base. We match if:
634 * 1) the DN we are looking for exactly matches a
635 * certain partition and always stop
636 * 2) the DN we are looking for is a parent of certain
637 * partitions and it isn't a scope base search
638 * 3) the DN we are looking for is a child of a certain
639 * partition and always stop
640 * - we don't need to go any further up in the
641 * hierarchy!
643 if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
644 req->op.search.base) == 0) {
645 match = true;
646 stop = true;
648 if (!match &&
649 (ldb_dn_compare_base(req->op.search.base,
650 data->partitions[i]->ctrl->dn) == 0 &&
651 req->op.search.scope != LDB_SCOPE_BASE)) {
652 match = true;
654 if (!match &&
655 ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
656 req->op.search.base) == 0) {
657 match = true;
658 stop = true; /* note that this relies on partition ordering */
660 } else {
661 /* Domain scope: Find all partitions under the search
662 * base.
664 * We generate referral candidates if we haven't
665 * specified the domain scope control, haven't a base
666 * search* scope and the DN we are looking for is a real
667 * predecessor of certain partitions. When a new
668 * referral candidate is nearer to the DN than an
669 * existing one delete the latter (we want to have only
670 * the closest ones). When we checked this for all
671 * candidates we have the final referrals.
673 * We match if the DN we are looking for is a child of
674 * a certain partition or the partition
675 * DN itself - we don't need to go any further
676 * up in the hierarchy!
678 if ((!domain_scope) &&
679 (req->op.search.scope != LDB_SCOPE_BASE) &&
680 (ldb_dn_compare_base(req->op.search.base,
681 data->partitions[i]->ctrl->dn) == 0) &&
682 (ldb_dn_compare(req->op.search.base,
683 data->partitions[i]->ctrl->dn) != 0)) {
684 char *ref = talloc_asprintf(ac,
685 "ldap://%s/%s%s",
686 lpcfg_dnsdomain(lp_ctx),
687 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
688 req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
690 if (ref == NULL) {
691 return ldb_oom(ldb);
694 /* Initialise the referrals list */
695 if (ac->referrals == NULL) {
696 ac->referrals = (const char **) str_list_make_empty(ac);
697 if (ac->referrals == NULL) {
698 return ldb_oom(ldb);
702 /* Check if the new referral candidate is
703 * closer to the base DN than already
704 * saved ones and delete the latters */
705 j = 0;
706 while (ac->referrals[j] != NULL) {
707 if (strstr(ac->referrals[j],
708 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
709 str_list_remove(ac->referrals,
710 ac->referrals[j]);
711 } else {
712 ++j;
716 /* Add our new candidate */
717 ac->referrals = str_list_add(ac->referrals, ref);
719 talloc_free(ref);
721 if (ac->referrals == NULL) {
722 return ldb_oom(ldb);
725 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
726 match = true;
727 stop = true; /* note that this relies on partition ordering */
731 if (match) {
732 ret = partition_prep_request(ac, data->partitions[i]);
733 if (ret != LDB_SUCCESS) {
734 return ret;
738 if (stop) break;
741 /* Perhaps we didn't match any partitions. Try the main partition */
742 if (ac->num_requests == 0) {
743 talloc_free(ac);
744 return ldb_next_request(module, req);
747 /* fire the first one */
748 return partition_call_first(ac);
751 /* add */
752 static int partition_add(struct ldb_module *module, struct ldb_request *req)
754 return partition_replicate(module, req, req->op.add.message->dn);
757 /* modify */
758 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
760 return partition_replicate(module, req, req->op.mod.message->dn);
763 /* delete */
764 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
766 return partition_replicate(module, req, req->op.del.dn);
769 /* rename */
770 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
772 /* Find backend */
773 struct dsdb_partition *backend, *backend2;
775 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
776 struct partition_private_data);
778 /* Skip the lot if 'data' isn't here yet (initialisation) */
779 if (!data) {
780 return ldb_operr(ldb_module_get_ctx(module));
783 backend = find_partition(data, req->op.rename.olddn, req);
784 backend2 = find_partition(data, req->op.rename.newdn, req);
786 if ((backend && !backend2) || (!backend && backend2)) {
787 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
790 if (backend != backend2) {
791 ldb_asprintf_errstring(ldb_module_get_ctx(module),
792 "Cannot rename from %s in %s to %s in %s: %s",
793 ldb_dn_get_linearized(req->op.rename.olddn),
794 ldb_dn_get_linearized(backend->ctrl->dn),
795 ldb_dn_get_linearized(req->op.rename.newdn),
796 ldb_dn_get_linearized(backend2->ctrl->dn),
797 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
798 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
801 return partition_replicate(module, req, req->op.rename.olddn);
804 /* start a transaction */
805 static int partition_start_trans(struct ldb_module *module)
807 unsigned int i;
808 int ret;
809 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
810 struct partition_private_data);
811 /* Look at base DN */
812 /* Figure out which partition it is under */
813 /* Skip the lot if 'data' isn't here yet (initialization) */
814 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
815 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
817 ret = ldb_next_start_trans(module);
818 if (ret != LDB_SUCCESS) {
819 return ret;
822 ret = partition_reload_if_required(module, data, NULL);
823 if (ret != LDB_SUCCESS) {
824 ldb_next_del_trans(module);
825 return ret;
828 ret = partition_metadata_start_trans(module);
829 if (ret != LDB_SUCCESS) {
830 ldb_next_del_trans(module);
831 return ret;
834 for (i=0; data && data->partitions && data->partitions[i]; i++) {
835 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
836 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
837 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
839 ret = ldb_next_start_trans(data->partitions[i]->module);
840 if (ret != LDB_SUCCESS) {
841 /* Back it out, if it fails on one */
842 for (i--; i >= 0; i--) {
843 ldb_next_del_trans(data->partitions[i]->module);
845 ldb_next_del_trans(module);
846 partition_metadata_del_trans(module);
847 return ret;
851 data->in_transaction++;
853 return LDB_SUCCESS;
856 /* prepare for a commit */
857 static int partition_prepare_commit(struct ldb_module *module)
859 unsigned int i;
860 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
861 struct partition_private_data);
862 int ret;
864 for (i=0; data && data->partitions && data->partitions[i]; i++) {
865 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
866 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
867 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
869 ret = ldb_next_prepare_commit(data->partitions[i]->module);
870 if (ret != LDB_SUCCESS) {
871 ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
872 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
873 ldb_errstring(ldb_module_get_ctx(module)));
874 return ret;
878 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
879 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
882 ret = ldb_next_prepare_commit(module);
883 if (ret != LDB_SUCCESS) {
884 return ret;
887 /* metadata prepare commit must come last, as other partitions could modify
888 * the database inside the prepare commit method of a module */
889 return partition_metadata_prepare_commit(module);
893 /* end a transaction */
894 static int partition_end_trans(struct ldb_module *module)
896 int ret, ret2;
897 unsigned int i;
898 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
899 struct partition_private_data);
901 ret = LDB_SUCCESS;
903 if (data->in_transaction == 0) {
904 DEBUG(0,("partition end transaction mismatch\n"));
905 ret = LDB_ERR_OPERATIONS_ERROR;
906 } else {
907 data->in_transaction--;
910 ret2 = partition_metadata_end_trans(module);
911 if (ret2 != LDB_SUCCESS) {
912 ret = ret2;
915 for (i=0; data && data->partitions && data->partitions[i]; i++) {
916 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
917 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
918 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
920 ret2 = ldb_next_end_trans(data->partitions[i]->module);
921 if (ret2 != LDB_SUCCESS) {
922 ldb_asprintf_errstring(ldb_module_get_ctx(module), "end_trans error on %s: %s",
923 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
924 ldb_errstring(ldb_module_get_ctx(module)));
925 ret = ret2;
929 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
930 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
932 ret2 = ldb_next_end_trans(module);
933 if (ret2 != LDB_SUCCESS) {
934 ret = ret2;
936 return ret;
939 /* delete a transaction */
940 static int partition_del_trans(struct ldb_module *module)
942 int ret, final_ret = LDB_SUCCESS;
943 unsigned int i;
944 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
945 struct partition_private_data);
946 ret = partition_metadata_del_trans(module);
947 if (ret != LDB_SUCCESS) {
948 final_ret = ret;
951 for (i=0; data && data->partitions && data->partitions[i]; i++) {
952 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
953 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
954 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
956 ret = ldb_next_del_trans(data->partitions[i]->module);
957 if (ret != LDB_SUCCESS) {
958 ldb_asprintf_errstring(ldb_module_get_ctx(module), "del_trans error on %s: %s",
959 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
960 ldb_errstring(ldb_module_get_ctx(module)));
961 final_ret = ret;
965 if (data->in_transaction == 0) {
966 DEBUG(0,("partition del transaction mismatch\n"));
967 return ldb_operr(ldb_module_get_ctx(module));
969 data->in_transaction--;
971 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
972 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
974 ret = ldb_next_del_trans(module);
975 if (ret != LDB_SUCCESS) {
976 final_ret = ret;
978 return final_ret;
981 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
982 uint64_t *seq_number,
983 struct ldb_request *parent)
985 int ret;
986 struct ldb_result *res;
987 struct ldb_seqnum_request *tseq;
988 struct ldb_seqnum_result *seqr;
990 tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
991 if (tseq == NULL) {
992 return ldb_oom(ldb_module_get_ctx(module));
994 tseq->type = LDB_SEQ_HIGHEST_SEQ;
996 ret = dsdb_module_extended(module, tseq, &res,
997 LDB_EXTENDED_SEQUENCE_NUMBER,
998 tseq,
999 DSDB_FLAG_NEXT_MODULE,
1000 parent);
1001 if (ret != LDB_SUCCESS) {
1002 talloc_free(tseq);
1003 return ret;
1006 seqr = talloc_get_type_abort(res->extended->data,
1007 struct ldb_seqnum_result);
1008 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1009 talloc_free(res);
1010 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1011 "Primary backend in partition module returned a timestamp based seq");
1014 *seq_number = seqr->seq_num;
1015 talloc_free(tseq);
1016 return LDB_SUCCESS;
1021 * Older version of sequence number as sum of sequence numbers for each partition
1023 int partition_sequence_number_from_partitions(struct ldb_module *module,
1024 uint64_t *seqr)
1026 int ret;
1027 unsigned int i;
1028 uint64_t seq_number = 0;
1029 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1030 struct partition_private_data);
1032 ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1033 if (ret != LDB_SUCCESS) {
1034 return ret;
1037 /* Skip the lot if 'data' isn't here yet (initialisation) */
1038 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1039 struct ldb_seqnum_request *tseq;
1040 struct ldb_seqnum_result *tseqr;
1041 struct ldb_request *treq;
1042 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1043 if (res == NULL) {
1044 return ldb_oom(ldb_module_get_ctx(module));
1046 tseq = talloc_zero(res, struct ldb_seqnum_request);
1047 if (tseq == NULL) {
1048 talloc_free(res);
1049 return ldb_oom(ldb_module_get_ctx(module));
1051 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1053 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1054 LDB_EXTENDED_SEQUENCE_NUMBER,
1055 tseq,
1056 NULL,
1057 res,
1058 ldb_extended_default_callback,
1059 NULL);
1060 LDB_REQ_SET_LOCATION(treq);
1061 if (ret != LDB_SUCCESS) {
1062 talloc_free(res);
1063 return ret;
1066 ret = partition_request(data->partitions[i]->module, treq);
1067 if (ret != LDB_SUCCESS) {
1068 talloc_free(res);
1069 return ret;
1071 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1072 if (ret != LDB_SUCCESS) {
1073 talloc_free(res);
1074 return ret;
1076 tseqr = talloc_get_type(res->extended->data,
1077 struct ldb_seqnum_result);
1078 seq_number += tseqr->seq_num;
1079 talloc_free(res);
1082 *seqr = seq_number;
1083 return LDB_SUCCESS;
1088 * Newer version of sequence number using metadata tdb
1090 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1092 struct ldb_extended *ext;
1093 struct ldb_seqnum_request *seq;
1094 struct ldb_seqnum_result *seqr;
1095 uint64_t seq_number;
1096 int ret;
1098 seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1099 switch (seq->type) {
1100 case LDB_SEQ_NEXT:
1101 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1102 if (ret != LDB_SUCCESS) {
1103 return ret;
1105 break;
1107 case LDB_SEQ_HIGHEST_SEQ:
1108 ret = partition_metadata_sequence_number(module, &seq_number);
1109 if (ret != LDB_SUCCESS) {
1110 return ret;
1112 break;
1114 case LDB_SEQ_HIGHEST_TIMESTAMP:
1115 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1116 "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1119 ext = talloc_zero(req, struct ldb_extended);
1120 if (!ext) {
1121 return ldb_module_oom(module);
1123 seqr = talloc_zero(ext, struct ldb_seqnum_result);
1124 if (seqr == NULL) {
1125 talloc_free(ext);
1126 return ldb_module_oom(module);
1128 ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1129 ext->data = seqr;
1131 seqr->seq_num = seq_number;
1132 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1134 /* send request done */
1135 return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1138 /* extended */
1139 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1141 struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1142 struct partition_private_data);
1143 struct partition_context *ac;
1144 int ret;
1146 /* if we aren't initialised yet go further */
1147 if (!data) {
1148 return ldb_next_request(module, req);
1151 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1152 /* Update the metadata.tdb to increment the schema version if needed*/
1153 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1154 ret = partition_metadata_inc_schema_sequence(module);
1155 return ldb_module_done(req, NULL, NULL, ret);
1158 if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1159 return partition_sequence_number(module, req);
1162 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1163 return partition_create(module, req);
1167 * as the extended operation has no dn
1168 * we need to send it to all partitions
1171 ac = partition_init_ctx(module, req);
1172 if (!ac) {
1173 return ldb_operr(ldb_module_get_ctx(module));
1176 return partition_send_all(module, ac, req);
1179 static const struct ldb_module_ops ldb_partition_module_ops = {
1180 .name = "partition",
1181 .init_context = partition_init,
1182 .search = partition_search,
1183 .add = partition_add,
1184 .modify = partition_modify,
1185 .del = partition_delete,
1186 .rename = partition_rename,
1187 .extended = partition_extended,
1188 .start_transaction = partition_start_trans,
1189 .prepare_commit = partition_prepare_commit,
1190 .end_transaction = partition_end_trans,
1191 .del_transaction = partition_del_trans,
1194 int ldb_partition_module_init(const char *version)
1196 LDB_MODULE_CHECK_VERSION(version);
1197 return ldb_register_module(&ldb_partition_module_ops);