s4:resolve_oids LDB module - not really a change but a nicer method to call "talloc_r...
[Samba/nascimento.git] / source4 / dsdb / samdb / ldb_modules / partition.c
blob19bd03633a8b1340623b475ffa11a76bab109f50
1 /*
2 Partitions ldb module
4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
22 * Name: ldb
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
34 struct part_request {
35 struct ldb_module *module;
36 struct ldb_request *req;
39 struct partition_context {
40 struct ldb_module *module;
41 struct ldb_request *req;
43 struct part_request *part_req;
44 unsigned int num_requests;
45 unsigned int finished_requests;
47 const char **referrals;
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
52 struct partition_context *ac;
54 ac = talloc_zero(req, struct partition_context);
55 if (ac == NULL) {
56 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57 return NULL;
60 ac->module = module;
61 ac->req = req;
63 return ac;
67 * helper functions to call the next module in chain
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
71 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
72 const struct dsdb_control_current_partition *partition = NULL;
73 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74 if (partition_ctrl) {
75 partition = talloc_get_type(partition_ctrl->data,
76 struct dsdb_control_current_partition);
79 if (partition != NULL) {
80 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition->dn));
82 } else {
83 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module, request);
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91 struct ldb_dn *dn,
92 struct ldb_request *req)
94 unsigned int i;
95 struct ldb_control *partition_ctrl;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102 if (partition_ctrl) {
103 const struct dsdb_control_current_partition *partition;
104 partition = talloc_get_type(partition_ctrl->data,
105 struct dsdb_control_current_partition);
106 if (partition != NULL) {
107 dn = partition->dn;
111 if (dn == NULL) {
112 return NULL;
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i=0; data && data->partitions && data->partitions[i]; i++) {
119 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120 return data->partitions[i];
124 return NULL;
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request *req,
131 struct ldb_reply *ares)
133 struct partition_context *ac;
134 struct ldb_module *module;
135 struct ldb_request *nreq;
136 int ret;
137 struct partition_private_data *data;
138 struct ldb_control *partition_ctrl;
140 ac = talloc_get_type(req->context, struct partition_context);
141 data = talloc_get_type(ac->module->private_data, struct partition_private_data);
143 if (!ares) {
144 return ldb_module_done(ac->req, NULL, NULL,
145 LDB_ERR_OPERATIONS_ERROR);
148 partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149 if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150 /* If we didn't fan this request out to mulitple partitions,
151 * or this is an individual search result, we can
152 * deterministily tell the caller what partition this was
153 * written to (repl_meta_data likes to know) */
154 ret = ldb_reply_add_control(ares,
155 DSDB_CONTROL_CURRENT_PARTITION_OID,
156 false, partition_ctrl->data);
157 if (ret != LDB_SUCCESS) {
158 return ldb_module_done(ac->req, NULL, NULL,
159 ret);
163 if (ares->error != LDB_SUCCESS) {
164 return ldb_module_done(ac->req, ares->controls,
165 ares->response, ares->error);
168 switch (ares->type) {
169 case LDB_REPLY_REFERRAL:
170 /* ignore referrals for now */
171 break;
173 case LDB_REPLY_ENTRY:
174 if (ac->req->operation != LDB_SEARCH) {
175 ldb_set_errstring(ldb_module_get_ctx(ac->module),
176 "partition_req_callback:"
177 " Unsupported reply type for this request");
178 return ldb_module_done(ac->req, NULL, NULL,
179 LDB_ERR_OPERATIONS_ERROR);
182 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
184 case LDB_REPLY_DONE:
185 if (ac->req->operation == LDB_EXTENDED) {
186 /* FIXME: check for ares->response, replmd does not fill it ! */
187 if (ares->response) {
188 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
189 ldb_set_errstring(ldb_module_get_ctx(ac->module),
190 "partition_req_callback:"
191 " Unknown extended reply, "
192 "only supports START_TLS");
193 talloc_free(ares);
194 return ldb_module_done(ac->req, NULL, NULL,
195 LDB_ERR_OPERATIONS_ERROR);
200 ac->finished_requests++;
201 if (ac->finished_requests == ac->num_requests) {
202 /* Send back referrals if they do exist (search ops) */
203 if (ac->referrals != NULL) {
204 const char **ref;
205 for (ref = ac->referrals; *ref != NULL; ++ref) {
206 ret = ldb_module_send_referral(ac->req,
207 talloc_strdup(ac->req, *ref));
208 if (ret != LDB_SUCCESS) {
209 return ldb_module_done(ac->req, NULL, NULL,
210 ret);
215 /* this was the last one, call callback */
216 return ldb_module_done(ac->req, ares->controls,
217 ares->response,
218 ares->error);
221 /* not the last, now call the next one */
222 module = ac->part_req[ac->finished_requests].module;
223 nreq = ac->part_req[ac->finished_requests].req;
225 ret = partition_request(module, nreq);
226 if (ret != LDB_SUCCESS) {
227 talloc_free(ares);
228 return ldb_module_done(ac->req, NULL, NULL, ret);
231 break;
234 talloc_free(ares);
235 return LDB_SUCCESS;
238 static int partition_prep_request(struct partition_context *ac,
239 struct dsdb_partition *partition)
241 int ret;
242 struct ldb_request *req;
244 ac->part_req = talloc_realloc(ac, ac->part_req,
245 struct part_request,
246 ac->num_requests + 1);
247 if (ac->part_req == NULL) {
248 ldb_oom(ldb_module_get_ctx(ac->module));
249 return LDB_ERR_OPERATIONS_ERROR;
252 switch (ac->req->operation) {
253 case LDB_SEARCH:
254 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
255 ac->part_req,
256 ac->req->op.search.base,
257 ac->req->op.search.scope,
258 ac->req->op.search.tree,
259 ac->req->op.search.attrs,
260 ac->req->controls,
261 ac, partition_req_callback,
262 ac->req);
263 break;
264 case LDB_ADD:
265 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
266 ac->req->op.add.message,
267 ac->req->controls,
268 ac, partition_req_callback,
269 ac->req);
270 break;
271 case LDB_MODIFY:
272 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273 ac->req->op.mod.message,
274 ac->req->controls,
275 ac, partition_req_callback,
276 ac->req);
277 break;
278 case LDB_DELETE:
279 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280 ac->req->op.del.dn,
281 ac->req->controls,
282 ac, partition_req_callback,
283 ac->req);
284 break;
285 case LDB_RENAME:
286 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
287 ac->req->op.rename.olddn,
288 ac->req->op.rename.newdn,
289 ac->req->controls,
290 ac, partition_req_callback,
291 ac->req);
292 break;
293 case LDB_EXTENDED:
294 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
295 ac->part_req,
296 ac->req->op.extended.oid,
297 ac->req->op.extended.data,
298 ac->req->controls,
299 ac, partition_req_callback,
300 ac->req);
301 break;
302 default:
303 ldb_set_errstring(ldb_module_get_ctx(ac->module),
304 "Unsupported request type!");
305 ret = LDB_ERR_UNWILLING_TO_PERFORM;
308 if (ret != LDB_SUCCESS) {
309 return ret;
312 ac->part_req[ac->num_requests].req = req;
314 if (ac->req->controls) {
315 req->controls = talloc_memdup(req, ac->req->controls,
316 talloc_get_size(ac->req->controls));
317 if (req->controls == NULL) {
318 ldb_oom(ldb_module_get_ctx(ac->module));
319 return LDB_ERR_OPERATIONS_ERROR;
323 if (partition) {
324 ac->part_req[ac->num_requests].module = partition->module;
326 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
327 ret = ldb_request_add_control(req,
328 DSDB_CONTROL_CURRENT_PARTITION_OID,
329 false, partition->ctrl);
330 if (ret != LDB_SUCCESS) {
331 return ret;
335 if (req->operation == LDB_SEARCH) {
336 /* If the search is for 'more' than this partition,
337 * then change the basedn, so a remote LDAP server
338 * doesn't object */
339 if (ldb_dn_compare_base(partition->ctrl->dn,
340 req->op.search.base) != 0) {
341 req->op.search.base = partition->ctrl->dn;
345 } else {
346 /* make sure you put the module here, or
347 * or ldb_next_request() will skip a module */
348 ac->part_req[ac->num_requests].module = ac->module;
351 ac->num_requests++;
353 return LDB_SUCCESS;
356 static int partition_call_first(struct partition_context *ac)
358 return partition_request(ac->part_req[0].module, ac->part_req[0].req);
362 * Send a request down to all the partitions
364 static int partition_send_all(struct ldb_module *module,
365 struct partition_context *ac,
366 struct ldb_request *req)
368 unsigned int i;
369 struct partition_private_data *data = talloc_get_type(module->private_data,
370 struct partition_private_data);
371 int ret = partition_prep_request(ac, NULL);
372 if (ret != LDB_SUCCESS) {
373 return ret;
375 for (i=0; data && data->partitions && data->partitions[i]; i++) {
376 ret = partition_prep_request(ac, data->partitions[i]);
377 if (ret != LDB_SUCCESS) {
378 return ret;
382 /* fire the first one */
383 return partition_call_first(ac);
387 * Figure out which backend a request needs to be aimed at. Some
388 * requests must be replicated to all backends
390 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn)
392 struct partition_context *ac;
393 unsigned int i;
394 int ret;
395 struct dsdb_partition *partition;
396 struct partition_private_data *data = talloc_get_type(module->private_data,
397 struct partition_private_data);
398 if (!data || !data->partitions) {
399 return ldb_next_request(module, req);
402 if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
403 /* Is this a special DN, we need to replicate to every backend? */
404 for (i=0; data->replicate && data->replicate[i]; i++) {
405 if (ldb_dn_compare(data->replicate[i],
406 dn) == 0) {
408 ac = partition_init_ctx(module, req);
409 if (!ac) {
410 return LDB_ERR_OPERATIONS_ERROR;
413 return partition_send_all(module, ac, req);
418 /* Otherwise, we need to find the partition to fire it to */
420 /* Find partition */
421 partition = find_partition(data, dn, req);
422 if (!partition) {
424 * if we haven't found a matching partition
425 * pass the request to the main ldb
427 * TODO: we should maybe return an error here
428 * if it's not a special dn
431 return ldb_next_request(module, req);
434 ac = partition_init_ctx(module, req);
435 if (!ac) {
436 return LDB_ERR_OPERATIONS_ERROR;
439 /* we need to add a control but we never touch the original request */
440 ret = partition_prep_request(ac, partition);
441 if (ret != LDB_SUCCESS) {
442 return ret;
445 /* fire the first one */
446 return partition_call_first(ac);
449 /* search */
450 static int partition_search(struct ldb_module *module, struct ldb_request *req)
452 struct ldb_control **saved_controls;
453 /* Find backend */
454 struct partition_private_data *data = talloc_get_type(module->private_data,
455 struct partition_private_data);
456 struct partition_context *ac;
457 struct ldb_context *ldb;
458 struct loadparm_context *lp_ctx;
460 struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
461 struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
463 struct ldb_search_options_control *search_options = NULL;
464 struct dsdb_partition *p;
465 unsigned int i, j;
466 int ret;
467 bool domain_scope = false, phantom_root = false;
469 ret = partition_reload_if_required(module, data);
470 if (ret != LDB_SUCCESS) {
471 return ret;
474 p = find_partition(data, NULL, req);
475 if (p != NULL) {
476 /* the caller specified what partition they want the
477 * search - just pass it on
479 return ldb_next_request(p->module, req);
482 /* Get back the search options from the search control, and mark it as
483 * non-critical (to make backends and also dcpromo happy).
485 if (search_control) {
486 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
487 search_control->critical = 0;
491 /* Remove the "domain_scope" control, so we don't confuse a backend
492 * server */
493 if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
494 ldb_oom(ldb_module_get_ctx(module));
495 return LDB_ERR_OPERATIONS_ERROR;
498 /* Locate the options */
499 domain_scope = (search_options
500 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
501 || domain_scope_control;
502 phantom_root = search_options
503 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
505 /* Remove handled options from the search control flag */
506 if (search_options) {
507 search_options->search_options = search_options->search_options
508 & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
509 & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
512 if (!data || !data->partitions) {
513 return ldb_next_request(module, req);
516 ac = partition_init_ctx(module, req);
517 if (!ac) {
518 return LDB_ERR_OPERATIONS_ERROR;
521 ldb = ldb_module_get_ctx(ac->module);
522 lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
523 struct loadparm_context);
525 /* Search from the base DN */
526 if (ldb_dn_is_null(req->op.search.base)) {
527 return partition_send_all(module, ac, req);
530 for (i=0; data->partitions[i]; i++) {
531 bool match = false, stop = false;
533 if (phantom_root) {
534 /* Phantom root: Find all partitions under the
535 * search base. We match if:
537 * 1) the DN we are looking for exactly matches a
538 * certain partition and always stop
539 * 2) the DN we are looking for is a parent of certain
540 * partitions and it isn't a scope base search
541 * 3) the DN we are looking for is a child of a certain
542 * partition and always stop
543 * - we don't need to go any further up in the
544 * hierarchy!
546 if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
547 req->op.search.base) == 0) {
548 match = true;
549 stop = true;
551 if (!match &&
552 (ldb_dn_compare_base(req->op.search.base,
553 data->partitions[i]->ctrl->dn) == 0 &&
554 req->op.search.scope != LDB_SCOPE_BASE)) {
555 match = true;
557 if (!match &&
558 ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
559 req->op.search.base) == 0) {
560 match = true;
561 stop = true; /* note that this relies on partition ordering */
563 } else {
564 /* Domain scope: Find all partitions under the search
565 * base.
567 * We generate referral candidates if we haven't
568 * specified the domain scope control, haven't a base
569 * search* scope and the DN we are looking for is a real
570 * predecessor of certain partitions. When a new
571 * referral candidate is nearer to the DN than an
572 * existing one delete the latter (we want to have only
573 * the closest ones). When we checked this for all
574 * candidates we have the final referrals.
576 * We match if the DN we are looking for is a child of
577 * a certain partition or the partition
578 * DN itself - we don't need to go any further
579 * up in the hierarchy!
581 if ((!domain_scope) &&
582 (req->op.search.scope != LDB_SCOPE_BASE) &&
583 (ldb_dn_compare_base(req->op.search.base,
584 data->partitions[i]->ctrl->dn) == 0) &&
585 (ldb_dn_compare(req->op.search.base,
586 data->partitions[i]->ctrl->dn) != 0)) {
587 char *ref = talloc_asprintf(ac,
588 "ldap://%s/%s%s",
589 lp_dnsdomain(lp_ctx),
590 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
591 req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
593 if (ref == NULL) {
594 ldb_oom(ldb);
595 return LDB_ERR_OPERATIONS_ERROR;
598 /* Initialise the referrals list */
599 if (ac->referrals == NULL) {
600 ac->referrals = (const char **) str_list_make_empty(ac);
601 if (ac->referrals == NULL) {
602 ldb_oom(ldb);
603 return LDB_ERR_OPERATIONS_ERROR;
607 /* Check if the new referral candidate is
608 * closer to the base DN than already
609 * saved ones and delete the latters */
610 j = 0;
611 while (ac->referrals[j] != NULL) {
612 if (strstr(ac->referrals[j],
613 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
614 str_list_remove(ac->referrals,
615 ac->referrals[j]);
616 } else {
617 ++j;
621 /* Add our new candidate */
622 ac->referrals = str_list_add(ac->referrals, ref);
624 talloc_free(ref);
626 if (ac->referrals == NULL) {
627 ldb_oom(ldb);
628 return LDB_ERR_OPERATIONS_ERROR;
631 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
632 match = true;
633 stop = true; /* note that this relies on partition ordering */
637 if (match) {
638 ret = partition_prep_request(ac, data->partitions[i]);
639 if (ret != LDB_SUCCESS) {
640 return ret;
644 if (stop) break;
647 /* Perhaps we didn't match any partitions. Try the main partition */
648 if (ac->num_requests == 0) {
649 talloc_free(ac);
650 return ldb_next_request(module, req);
653 /* fire the first one */
654 return partition_call_first(ac);
657 /* add */
658 static int partition_add(struct ldb_module *module, struct ldb_request *req)
660 return partition_replicate(module, req, req->op.add.message->dn);
663 /* modify */
664 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
666 return partition_replicate(module, req, req->op.mod.message->dn);
669 /* delete */
670 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
672 return partition_replicate(module, req, req->op.del.dn);
675 /* rename */
676 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
678 /* Find backend */
679 struct dsdb_partition *backend, *backend2;
681 struct partition_private_data *data = talloc_get_type(module->private_data,
682 struct partition_private_data);
684 /* Skip the lot if 'data' isn't here yet (initialisation) */
685 if (!data) {
686 return LDB_ERR_OPERATIONS_ERROR;
689 backend = find_partition(data, req->op.rename.olddn, req);
690 backend2 = find_partition(data, req->op.rename.newdn, req);
692 if ((backend && !backend2) || (!backend && backend2)) {
693 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
696 if (backend != backend2) {
697 ldb_asprintf_errstring(ldb_module_get_ctx(module),
698 "Cannot rename from %s in %s to %s in %s: %s",
699 ldb_dn_get_linearized(req->op.rename.olddn),
700 ldb_dn_get_linearized(backend->ctrl->dn),
701 ldb_dn_get_linearized(req->op.rename.newdn),
702 ldb_dn_get_linearized(backend2->ctrl->dn),
703 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
704 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
707 return partition_replicate(module, req, req->op.rename.olddn);
710 /* start a transaction */
711 static int partition_start_trans(struct ldb_module *module)
713 unsigned int i;
714 int ret;
715 struct partition_private_data *data = talloc_get_type(module->private_data,
716 struct partition_private_data);
717 /* Look at base DN */
718 /* Figure out which partition it is under */
719 /* Skip the lot if 'data' isn't here yet (initialization) */
720 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
721 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
723 ret = ldb_next_start_trans(module);
724 if (ret != LDB_SUCCESS) {
725 return ret;
728 ret = partition_reload_if_required(module, data);
729 if (ret != LDB_SUCCESS) {
730 return ret;
733 for (i=0; data && data->partitions && data->partitions[i]; i++) {
734 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
735 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
736 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
738 ret = ldb_next_start_trans(data->partitions[i]->module);
739 if (ret != LDB_SUCCESS) {
740 /* Back it out, if it fails on one */
741 for (i--; i >= 0; i--) {
742 ldb_next_del_trans(data->partitions[i]->module);
744 ldb_next_del_trans(module);
745 return ret;
749 data->in_transaction++;
751 return LDB_SUCCESS;
754 /* prepare for a commit */
755 static int partition_prepare_commit(struct ldb_module *module)
757 unsigned int i;
758 struct partition_private_data *data = talloc_get_type(module->private_data,
759 struct partition_private_data);
761 for (i=0; data && data->partitions && data->partitions[i]; i++) {
762 int ret;
764 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
765 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
766 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
768 ret = ldb_next_prepare_commit(data->partitions[i]->module);
769 if (ret != LDB_SUCCESS) {
770 ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s",
771 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
772 ldb_errstring(module->ldb));
773 return ret;
777 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
778 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
780 return ldb_next_prepare_commit(module);
784 /* end a transaction */
785 static int partition_end_trans(struct ldb_module *module)
787 int ret, ret2;
788 unsigned int i;
789 struct partition_private_data *data = talloc_get_type(module->private_data,
790 struct partition_private_data);
792 ret = LDB_SUCCESS;
794 if (data->in_transaction == 0) {
795 DEBUG(0,("partition end transaction mismatch\n"));
796 ret = LDB_ERR_OPERATIONS_ERROR;
797 } else {
798 data->in_transaction--;
801 for (i=0; data && data->partitions && data->partitions[i]; i++) {
802 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
803 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s",
804 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
806 ret2 = ldb_next_end_trans(data->partitions[i]->module);
807 if (ret2 != LDB_SUCCESS) {
808 ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s",
809 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
810 ldb_errstring(module->ldb));
811 ret = ret2;
815 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
816 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
818 ret2 = ldb_next_end_trans(module);
819 if (ret2 != LDB_SUCCESS) {
820 ret = ret2;
822 return ret;
825 /* delete a transaction */
826 static int partition_del_trans(struct ldb_module *module)
828 int ret, final_ret = LDB_SUCCESS;
829 unsigned int i;
830 struct partition_private_data *data = talloc_get_type(module->private_data,
831 struct partition_private_data);
832 for (i=0; data && data->partitions && data->partitions[i]; i++) {
833 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
834 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s",
835 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
837 ret = ldb_next_del_trans(data->partitions[i]->module);
838 if (ret != LDB_SUCCESS) {
839 ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s",
840 ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
841 ldb_errstring(module->ldb));
842 final_ret = ret;
846 if (data->in_transaction == 0) {
847 DEBUG(0,("partition del transaction mismatch\n"));
848 return LDB_ERR_OPERATIONS_ERROR;
850 data->in_transaction--;
852 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
853 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
855 ret = ldb_next_del_trans(module);
856 if (ret != LDB_SUCCESS) {
857 final_ret = ret;
859 return final_ret;
862 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx,
863 enum ldb_sequence_type type, uint64_t *seq_number)
865 int ret;
866 struct ldb_result *res;
867 struct ldb_seqnum_request *tseq;
868 struct ldb_request *treq;
869 struct ldb_seqnum_result *seqr;
870 res = talloc_zero(mem_ctx, struct ldb_result);
871 if (res == NULL) {
872 return LDB_ERR_OPERATIONS_ERROR;
874 tseq = talloc_zero(res, struct ldb_seqnum_request);
875 if (tseq == NULL) {
876 talloc_free(res);
877 return LDB_ERR_OPERATIONS_ERROR;
879 tseq->type = type;
881 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
882 LDB_EXTENDED_SEQUENCE_NUMBER,
883 tseq,
884 NULL,
885 res,
886 ldb_extended_default_callback,
887 NULL);
888 if (ret != LDB_SUCCESS) {
889 talloc_free(res);
890 return ret;
893 ret = ldb_next_request(module, treq);
894 if (ret != LDB_SUCCESS) {
895 talloc_free(res);
896 return ret;
898 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
899 if (ret != LDB_SUCCESS) {
900 talloc_free(res);
901 return ret;
904 seqr = talloc_get_type(res->extended->data,
905 struct ldb_seqnum_result);
906 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
907 ret = LDB_ERR_OPERATIONS_ERROR;
908 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
909 talloc_free(res);
910 return ret;
911 } else {
912 *seq_number = seqr->seq_num;
914 talloc_free(res);
915 return LDB_SUCCESS;
918 /* FIXME: This function is still semi-async */
919 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
921 int ret;
922 unsigned int i;
923 uint64_t seq_number = 0;
924 uint64_t timestamp_sequence = 0;
925 uint64_t timestamp = 0;
926 struct partition_private_data *data = talloc_get_type(module->private_data,
927 struct partition_private_data);
928 struct ldb_seqnum_request *seq;
929 struct ldb_seqnum_result *seqr;
930 struct ldb_request *treq;
931 struct ldb_seqnum_request *tseq;
932 struct ldb_seqnum_result *tseqr;
933 struct ldb_extended *ext;
934 struct ldb_result *res;
935 struct dsdb_partition *p;
937 p = find_partition(data, NULL, req);
938 if (p != NULL) {
939 /* the caller specified what partition they want the
940 * sequence number operation on - just pass it on
942 return ldb_next_request(p->module, req);
945 seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
947 switch (seq->type) {
948 case LDB_SEQ_NEXT:
949 case LDB_SEQ_HIGHEST_SEQ:
951 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
952 if (ret != LDB_SUCCESS) {
953 return ret;
956 /* Skip the lot if 'data' isn't here yet (initialisation) */
957 for (i=0; data && data->partitions && data->partitions[i]; i++) {
959 res = talloc_zero(req, struct ldb_result);
960 if (res == NULL) {
961 return LDB_ERR_OPERATIONS_ERROR;
963 tseq = talloc_zero(res, struct ldb_seqnum_request);
964 if (tseq == NULL) {
965 talloc_free(res);
966 return LDB_ERR_OPERATIONS_ERROR;
968 tseq->type = seq->type;
970 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
971 LDB_EXTENDED_SEQUENCE_NUMBER,
972 tseq,
973 NULL,
974 res,
975 ldb_extended_default_callback,
976 NULL);
977 if (ret != LDB_SUCCESS) {
978 talloc_free(res);
979 return ret;
982 if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
983 ret = ldb_request_add_control(treq,
984 DSDB_CONTROL_CURRENT_PARTITION_OID,
985 false, data->partitions[i]->ctrl);
986 if (ret != LDB_SUCCESS) {
987 talloc_free(res);
988 return ret;
992 ret = partition_request(data->partitions[i]->module, treq);
993 if (ret != LDB_SUCCESS) {
994 talloc_free(res);
995 return ret;
997 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
998 if (ret != LDB_SUCCESS) {
999 talloc_free(res);
1000 return ret;
1002 tseqr = talloc_get_type(res->extended->data,
1003 struct ldb_seqnum_result);
1004 if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1005 timestamp_sequence = MAX(timestamp_sequence,
1006 tseqr->seq_num);
1007 } else {
1008 seq_number += tseqr->seq_num;
1010 talloc_free(res);
1012 /* fall through */
1013 case LDB_SEQ_HIGHEST_TIMESTAMP:
1015 res = talloc_zero(req, struct ldb_result);
1016 if (res == NULL) {
1017 return LDB_ERR_OPERATIONS_ERROR;
1020 tseq = talloc_zero(res, struct ldb_seqnum_request);
1021 if (tseq == NULL) {
1022 talloc_free(res);
1023 return LDB_ERR_OPERATIONS_ERROR;
1025 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1027 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1028 LDB_EXTENDED_SEQUENCE_NUMBER,
1029 tseq,
1030 NULL,
1031 res,
1032 ldb_extended_default_callback,
1033 NULL);
1034 if (ret != LDB_SUCCESS) {
1035 talloc_free(res);
1036 return ret;
1039 ret = ldb_next_request(module, treq);
1040 if (ret != LDB_SUCCESS) {
1041 talloc_free(res);
1042 return ret;
1044 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1045 if (ret != LDB_SUCCESS) {
1046 talloc_free(res);
1047 return ret;
1050 tseqr = talloc_get_type(res->extended->data,
1051 struct ldb_seqnum_result);
1052 timestamp = tseqr->seq_num;
1054 talloc_free(res);
1056 /* Skip the lot if 'data' isn't here yet (initialisation) */
1057 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1059 res = talloc_zero(req, struct ldb_result);
1060 if (res == NULL) {
1061 return LDB_ERR_OPERATIONS_ERROR;
1064 tseq = talloc_zero(res, struct ldb_seqnum_request);
1065 if (tseq == NULL) {
1066 talloc_free(res);
1067 return LDB_ERR_OPERATIONS_ERROR;
1069 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1071 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1072 LDB_EXTENDED_SEQUENCE_NUMBER,
1073 tseq,
1074 NULL,
1075 res,
1076 ldb_extended_default_callback,
1077 NULL);
1078 if (ret != LDB_SUCCESS) {
1079 talloc_free(res);
1080 return ret;
1083 if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
1084 ret = ldb_request_add_control(treq,
1085 DSDB_CONTROL_CURRENT_PARTITION_OID,
1086 false, data->partitions[i]->ctrl);
1087 if (ret != LDB_SUCCESS) {
1088 talloc_free(res);
1089 return ret;
1093 ret = partition_request(data->partitions[i]->module, treq);
1094 if (ret != LDB_SUCCESS) {
1095 talloc_free(res);
1096 return ret;
1098 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1099 if (ret != LDB_SUCCESS) {
1100 talloc_free(res);
1101 return ret;
1104 tseqr = talloc_get_type(res->extended->data,
1105 struct ldb_seqnum_result);
1106 timestamp = MAX(timestamp, tseqr->seq_num);
1108 talloc_free(res);
1111 break;
1114 ext = talloc_zero(req, struct ldb_extended);
1115 if (!ext) {
1116 return LDB_ERR_OPERATIONS_ERROR;
1118 seqr = talloc_zero(ext, struct ldb_seqnum_result);
1119 if (seqr == NULL) {
1120 talloc_free(ext);
1121 return LDB_ERR_OPERATIONS_ERROR;
1123 ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1124 ext->data = seqr;
1126 switch (seq->type) {
1127 case LDB_SEQ_NEXT:
1128 case LDB_SEQ_HIGHEST_SEQ:
1130 /* Has someone above set a timebase sequence? */
1131 if (timestamp_sequence) {
1132 seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1133 } else {
1134 seqr->seq_num = seq_number;
1137 if (timestamp_sequence > seqr->seq_num) {
1138 seqr->seq_num = timestamp_sequence;
1139 seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1142 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1143 break;
1144 case LDB_SEQ_HIGHEST_TIMESTAMP:
1145 seqr->seq_num = timestamp;
1146 break;
1149 if (seq->type == LDB_SEQ_NEXT) {
1150 seqr->seq_num++;
1153 /* send request done */
1154 return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1157 /* extended */
1158 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1160 struct partition_private_data *data;
1161 struct partition_context *ac;
1162 int ret;
1164 data = talloc_get_type(module->private_data, struct partition_private_data);
1165 if (!data) {
1166 return ldb_next_request(module, req);
1169 ret = partition_reload_if_required(module, data);
1170 if (ret != LDB_SUCCESS) {
1171 return ret;
1174 if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1175 return partition_sequence_number(module, req);
1178 if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1179 return partition_create(module, req);
1183 * as the extended operation has no dn
1184 * we need to send it to all partitions
1187 ac = partition_init_ctx(module, req);
1188 if (!ac) {
1189 return LDB_ERR_OPERATIONS_ERROR;
1192 return partition_send_all(module, ac, req);
1195 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1196 .name = "partition",
1197 .init_context = partition_init,
1198 .search = partition_search,
1199 .add = partition_add,
1200 .modify = partition_modify,
1201 .del = partition_delete,
1202 .rename = partition_rename,
1203 .extended = partition_extended,
1204 .start_transaction = partition_start_trans,
1205 .prepare_commit = partition_prepare_commit,
1206 .end_transaction = partition_end_trans,
1207 .del_transaction = partition_del_trans,