4 Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5 Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program. If not, see <http://www.gnu.org/licenses/>.
24 * Component: ldb partitions module
26 * Description: Implement LDAP partitions
28 * Author: Andrew Bartlett
29 * Author: Stefan Metzmacher
32 #include "dsdb/samdb/ldb_modules/partition.h"
35 struct ldb_module
*module
;
36 struct ldb_request
*req
;
39 struct partition_context
{
40 struct ldb_module
*module
;
41 struct ldb_request
*req
;
43 struct part_request
*part_req
;
44 unsigned int num_requests
;
45 unsigned int finished_requests
;
47 const char **referrals
;
50 static struct partition_context
*partition_init_ctx(struct ldb_module
*module
, struct ldb_request
*req
)
52 struct partition_context
*ac
;
54 ac
= talloc_zero(req
, struct partition_context
);
56 ldb_set_errstring(ldb_module_get_ctx(module
), "Out of Memory");
67 * helper functions to call the next module in chain
69 int partition_request(struct ldb_module
*module
, struct ldb_request
*request
)
71 if ((module
&& ldb_module_flags(ldb_module_get_ctx(module
)) & LDB_FLG_ENABLE_TRACING
)) { \
72 const struct dsdb_control_current_partition
*partition
= NULL
;
73 struct ldb_control
*partition_ctrl
= ldb_request_get_control(request
, DSDB_CONTROL_CURRENT_PARTITION_OID
);
75 partition
= talloc_get_type(partition_ctrl
->data
,
76 struct dsdb_control_current_partition
);
79 if (partition
!= NULL
) {
80 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_request() -> %s",
81 ldb_dn_get_linearized(partition
->dn
));
83 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_request() -> (metadata partition)");
87 return ldb_next_request(module
, request
);
90 static struct dsdb_partition
*find_partition(struct partition_private_data
*data
,
92 struct ldb_request
*req
)
95 struct ldb_control
*partition_ctrl
;
97 /* see if the request has the partition DN specified in a
98 * control. The repl_meta_data module can specify this to
99 * ensure that replication happens to the right partition
101 partition_ctrl
= ldb_request_get_control(req
, DSDB_CONTROL_CURRENT_PARTITION_OID
);
102 if (partition_ctrl
) {
103 const struct dsdb_control_current_partition
*partition
;
104 partition
= talloc_get_type(partition_ctrl
->data
,
105 struct dsdb_control_current_partition
);
106 if (partition
!= NULL
) {
115 /* Look at base DN */
116 /* Figure out which partition it is under */
117 /* Skip the lot if 'data' isn't here yet (initialisation) */
118 for (i
=0; data
&& data
->partitions
&& data
->partitions
[i
]; i
++) {
119 if (ldb_dn_compare_base(data
->partitions
[i
]->ctrl
->dn
, dn
) == 0) {
120 return data
->partitions
[i
];
128 * fire the caller's callback for every entry, but only send 'done' once.
130 static int partition_req_callback(struct ldb_request
*req
,
131 struct ldb_reply
*ares
)
133 struct partition_context
*ac
;
134 struct ldb_module
*module
;
135 struct ldb_request
*nreq
;
137 struct ldb_control
*partition_ctrl
;
139 ac
= talloc_get_type(req
->context
, struct partition_context
);
142 return ldb_module_done(ac
->req
, NULL
, NULL
,
143 LDB_ERR_OPERATIONS_ERROR
);
146 partition_ctrl
= ldb_request_get_control(req
, DSDB_CONTROL_CURRENT_PARTITION_OID
);
147 if (partition_ctrl
&& (ac
->num_requests
== 1 || ares
->type
== LDB_REPLY_ENTRY
)) {
148 /* If we didn't fan this request out to mulitple partitions,
149 * or this is an individual search result, we can
150 * deterministically tell the caller what partition this was
151 * written to (repl_meta_data likes to know) */
152 ret
= ldb_reply_add_control(ares
,
153 DSDB_CONTROL_CURRENT_PARTITION_OID
,
154 false, partition_ctrl
->data
);
155 if (ret
!= LDB_SUCCESS
) {
156 return ldb_module_done(ac
->req
, NULL
, NULL
,
161 if (ares
->error
!= LDB_SUCCESS
) {
162 return ldb_module_done(ac
->req
, ares
->controls
,
163 ares
->response
, ares
->error
);
166 switch (ares
->type
) {
167 case LDB_REPLY_REFERRAL
:
168 return ldb_module_send_referral(ac
->req
, ares
->referral
);
170 case LDB_REPLY_ENTRY
:
171 if (ac
->req
->operation
!= LDB_SEARCH
) {
172 ldb_set_errstring(ldb_module_get_ctx(ac
->module
),
173 "partition_req_callback:"
174 " Unsupported reply type for this request");
175 return ldb_module_done(ac
->req
, NULL
, NULL
,
176 LDB_ERR_OPERATIONS_ERROR
);
179 return ldb_module_send_entry(ac
->req
, ares
->message
, ares
->controls
);
182 if (ac
->req
->operation
== LDB_EXTENDED
) {
183 /* FIXME: check for ares->response, replmd does not fill it ! */
184 if (ares
->response
) {
185 if (strcmp(ares
->response
->oid
, LDB_EXTENDED_START_TLS_OID
) != 0) {
186 ldb_set_errstring(ldb_module_get_ctx(ac
->module
),
187 "partition_req_callback:"
188 " Unknown extended reply, "
189 "only supports START_TLS");
191 return ldb_module_done(ac
->req
, NULL
, NULL
,
192 LDB_ERR_OPERATIONS_ERROR
);
197 ac
->finished_requests
++;
198 if (ac
->finished_requests
== ac
->num_requests
) {
199 /* Send back referrals if they do exist (search ops) */
200 if (ac
->referrals
!= NULL
) {
202 for (ref
= ac
->referrals
; *ref
!= NULL
; ++ref
) {
203 ret
= ldb_module_send_referral(ac
->req
,
204 talloc_strdup(ac
->req
, *ref
));
205 if (ret
!= LDB_SUCCESS
) {
206 return ldb_module_done(ac
->req
, NULL
, NULL
,
212 /* this was the last one, call callback */
213 return ldb_module_done(ac
->req
, ares
->controls
,
218 /* not the last, now call the next one */
219 module
= ac
->part_req
[ac
->finished_requests
].module
;
220 nreq
= ac
->part_req
[ac
->finished_requests
].req
;
222 ret
= partition_request(module
, nreq
);
223 if (ret
!= LDB_SUCCESS
) {
225 return ldb_module_done(ac
->req
, NULL
, NULL
, ret
);
235 static int partition_prep_request(struct partition_context
*ac
,
236 struct dsdb_partition
*partition
)
239 struct ldb_request
*req
;
240 struct ldb_control
*partition_ctrl
= NULL
;
242 ac
->part_req
= talloc_realloc(ac
, ac
->part_req
,
244 ac
->num_requests
+ 1);
245 if (ac
->part_req
== NULL
) {
246 return ldb_oom(ldb_module_get_ctx(ac
->module
));
249 switch (ac
->req
->operation
) {
251 ret
= ldb_build_search_req_ex(&req
, ldb_module_get_ctx(ac
->module
),
253 ac
->req
->op
.search
.base
,
254 ac
->req
->op
.search
.scope
,
255 ac
->req
->op
.search
.tree
,
256 ac
->req
->op
.search
.attrs
,
258 ac
, partition_req_callback
,
260 LDB_REQ_SET_LOCATION(req
);
263 ret
= ldb_build_add_req(&req
, ldb_module_get_ctx(ac
->module
), ac
->part_req
,
264 ac
->req
->op
.add
.message
,
266 ac
, partition_req_callback
,
268 LDB_REQ_SET_LOCATION(req
);
271 ret
= ldb_build_mod_req(&req
, ldb_module_get_ctx(ac
->module
), ac
->part_req
,
272 ac
->req
->op
.mod
.message
,
274 ac
, partition_req_callback
,
276 LDB_REQ_SET_LOCATION(req
);
279 ret
= ldb_build_del_req(&req
, ldb_module_get_ctx(ac
->module
), ac
->part_req
,
282 ac
, partition_req_callback
,
284 LDB_REQ_SET_LOCATION(req
);
287 ret
= ldb_build_rename_req(&req
, ldb_module_get_ctx(ac
->module
), ac
->part_req
,
288 ac
->req
->op
.rename
.olddn
,
289 ac
->req
->op
.rename
.newdn
,
291 ac
, partition_req_callback
,
293 LDB_REQ_SET_LOCATION(req
);
296 ret
= ldb_build_extended_req(&req
, ldb_module_get_ctx(ac
->module
),
298 ac
->req
->op
.extended
.oid
,
299 ac
->req
->op
.extended
.data
,
301 ac
, partition_req_callback
,
303 LDB_REQ_SET_LOCATION(req
);
306 ldb_set_errstring(ldb_module_get_ctx(ac
->module
),
307 "Unsupported request type!");
308 ret
= LDB_ERR_UNWILLING_TO_PERFORM
;
311 if (ret
!= LDB_SUCCESS
) {
315 ac
->part_req
[ac
->num_requests
].req
= req
;
317 if (ac
->req
->controls
) {
318 /* Duplicate everything beside the current partition control */
319 partition_ctrl
= ldb_request_get_control(ac
->req
,
320 DSDB_CONTROL_CURRENT_PARTITION_OID
);
321 if (!ldb_save_controls(partition_ctrl
, req
, NULL
)) {
322 return ldb_module_oom(ac
->module
);
327 void *part_data
= partition
->ctrl
;
329 ac
->part_req
[ac
->num_requests
].module
= partition
->module
;
331 if (partition_ctrl
!= NULL
) {
332 if (partition_ctrl
->data
!= NULL
) {
333 part_data
= partition_ctrl
->data
;
337 * If the provided current partition control is without
338 * data then use the calculated one.
340 ret
= ldb_request_add_control(req
,
341 DSDB_CONTROL_CURRENT_PARTITION_OID
,
343 if (ret
!= LDB_SUCCESS
) {
348 if (req
->operation
== LDB_SEARCH
) {
349 /* If the search is for 'more' than this partition,
350 * then change the basedn, so a remote LDAP server
352 if (ldb_dn_compare_base(partition
->ctrl
->dn
,
353 req
->op
.search
.base
) != 0) {
354 req
->op
.search
.base
= partition
->ctrl
->dn
;
359 /* make sure you put the module here, or
360 * or ldb_next_request() will skip a module */
361 ac
->part_req
[ac
->num_requests
].module
= ac
->module
;
369 static int partition_call_first(struct partition_context
*ac
)
371 return partition_request(ac
->part_req
[0].module
, ac
->part_req
[0].req
);
375 * Send a request down to all the partitions
377 static int partition_send_all(struct ldb_module
*module
,
378 struct partition_context
*ac
,
379 struct ldb_request
*req
)
382 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
383 struct partition_private_data
);
384 int ret
= partition_prep_request(ac
, NULL
);
385 if (ret
!= LDB_SUCCESS
) {
388 for (i
=0; data
&& data
->partitions
&& data
->partitions
[i
]; i
++) {
389 ret
= partition_prep_request(ac
, data
->partitions
[i
]);
390 if (ret
!= LDB_SUCCESS
) {
395 /* fire the first one */
396 return partition_call_first(ac
);
401 * send an operation to the top partition, then copy the resulting
402 * object to all other partitions
404 static int partition_copy_all(struct ldb_module
*module
,
405 struct partition_context
*ac
,
406 struct ldb_request
*req
,
410 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
411 struct partition_private_data
);
413 struct ldb_result
*res
;
415 /* do the request on the top level sam.ldb synchronously */
416 ret
= ldb_next_request(module
, req
);
417 if (ret
!= LDB_SUCCESS
) {
420 ret
= ldb_wait(req
->handle
, LDB_WAIT_ALL
);
421 if (ret
!= LDB_SUCCESS
) {
425 /* now fetch the resulting object, and then copy it to all the
426 * other partitions. We need this approach to cope with the
427 * partitions getting out of sync. If for example the
428 * @ATTRIBUTES object exists on one partition but not the
429 * others then just doing each of the partitions in turn will
432 search_ret
= dsdb_module_search_dn(module
, ac
, &res
, dn
, NULL
, DSDB_FLAG_NEXT_MODULE
, req
);
433 if (search_ret
!= LDB_SUCCESS
&& ret
!= LDB_ERR_NO_SUCH_OBJECT
) {
437 /* now delete the object in the other partitions. Once that is
438 done we will re-add the object, if search_ret was not
439 LDB_ERR_NO_SUCH_OBJECT
441 for (i
=0; data
->partitions
&& data
->partitions
[i
]; i
++) {
443 pret
= dsdb_module_del(data
->partitions
[i
]->module
, dn
, DSDB_FLAG_NEXT_MODULE
, req
);
444 if (pret
!= LDB_SUCCESS
&& pret
!= LDB_ERR_NO_SUCH_OBJECT
) {
445 /* we should only get success or no
446 such object from the other partitions */
452 if (search_ret
!= LDB_ERR_NO_SUCH_OBJECT
) {
453 /* now re-add in the other partitions */
454 for (i
=0; data
->partitions
&& data
->partitions
[i
]; i
++) {
456 pret
= dsdb_module_add(data
->partitions
[i
]->module
, res
->msgs
[0], DSDB_FLAG_NEXT_MODULE
, req
);
457 if (pret
!= LDB_SUCCESS
) {
463 return ldb_module_done(req
, NULL
, NULL
, LDB_SUCCESS
);
467 * Figure out which backend a request needs to be aimed at. Some
468 * requests must be replicated to all backends
470 static int partition_replicate(struct ldb_module
*module
, struct ldb_request
*req
, struct ldb_dn
*dn
)
472 struct partition_context
*ac
;
475 struct dsdb_partition
*partition
;
476 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
477 struct partition_private_data
);
479 /* if we aren't initialised yet go further */
480 if (!data
|| !data
->partitions
) {
481 return ldb_next_request(module
, req
);
484 if (ldb_dn_is_special(dn
)) {
485 /* Is this a special DN, we need to replicate to every backend? */
486 for (i
=0; data
->replicate
&& data
->replicate
[i
]; i
++) {
487 if (ldb_dn_compare(data
->replicate
[i
],
490 ac
= partition_init_ctx(module
, req
);
492 return ldb_operr(ldb_module_get_ctx(module
));
495 return partition_copy_all(module
, ac
, req
, dn
);
500 /* Otherwise, we need to find the partition to fire it to */
503 partition
= find_partition(data
, dn
, req
);
506 * if we haven't found a matching partition
507 * pass the request to the main ldb
509 * TODO: we should maybe return an error here
510 * if it's not a special dn
513 return ldb_next_request(module
, req
);
516 ac
= partition_init_ctx(module
, req
);
518 return ldb_operr(ldb_module_get_ctx(module
));
521 /* we need to add a control but we never touch the original request */
522 ret
= partition_prep_request(ac
, partition
);
523 if (ret
!= LDB_SUCCESS
) {
527 /* fire the first one */
528 return partition_call_first(ac
);
532 static int partition_search(struct ldb_module
*module
, struct ldb_request
*req
)
534 struct ldb_control
**saved_controls
;
536 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
537 struct partition_private_data
);
538 struct partition_context
*ac
;
539 struct ldb_context
*ldb
;
540 struct loadparm_context
*lp_ctx
;
542 struct ldb_control
*search_control
= ldb_request_get_control(req
, LDB_CONTROL_SEARCH_OPTIONS_OID
);
543 struct ldb_control
*domain_scope_control
= ldb_request_get_control(req
, LDB_CONTROL_DOMAIN_SCOPE_OID
);
544 struct ldb_control
*no_gc_control
= ldb_request_get_control(req
, DSDB_CONTROL_NO_GLOBAL_CATALOG
);
546 struct ldb_search_options_control
*search_options
= NULL
;
547 struct dsdb_partition
*p
;
550 bool domain_scope
= false, phantom_root
= false;
552 p
= find_partition(data
, NULL
, req
);
554 /* the caller specified what partition they want the
555 * search - just pass it on
557 return ldb_next_request(p
->module
, req
);
560 /* Get back the search options from the search control, and mark it as
561 * non-critical (to make backends and also dcpromo happy).
563 if (search_control
) {
564 search_options
= talloc_get_type(search_control
->data
, struct ldb_search_options_control
);
565 search_control
->critical
= 0;
569 /* Remove the "domain_scope" control, so we don't confuse a backend
571 if (domain_scope_control
&& !ldb_save_controls(domain_scope_control
, req
, &saved_controls
)) {
572 return ldb_oom(ldb_module_get_ctx(module
));
575 /* if we aren't initialised yet go further */
576 if (!data
|| !data
->partitions
) {
577 return ldb_next_request(module
, req
);
580 /* Special DNs without specified partition should go further */
581 if (ldb_dn_is_special(req
->op
.search
.base
)) {
582 return ldb_next_request(module
, req
);
585 /* Locate the options */
586 domain_scope
= (search_options
587 && (search_options
->search_options
& LDB_SEARCH_OPTION_DOMAIN_SCOPE
))
588 || domain_scope_control
;
589 phantom_root
= search_options
590 && (search_options
->search_options
& LDB_SEARCH_OPTION_PHANTOM_ROOT
);
592 /* Remove handled options from the search control flag */
593 if (search_options
) {
594 search_options
->search_options
= search_options
->search_options
595 & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
596 & ~LDB_SEARCH_OPTION_PHANTOM_ROOT
;
599 ac
= partition_init_ctx(module
, req
);
601 return ldb_operr(ldb_module_get_ctx(module
));
604 ldb
= ldb_module_get_ctx(ac
->module
);
605 lp_ctx
= talloc_get_type(ldb_get_opaque(ldb
, "loadparm"),
606 struct loadparm_context
);
608 /* Search from the base DN */
609 if (ldb_dn_is_null(req
->op
.search
.base
)) {
611 return ldb_error(ldb
, LDB_ERR_NO_SUCH_OBJECT
, "empty base DN");
613 return partition_send_all(module
, ac
, req
);
616 for (i
=0; data
->partitions
[i
]; i
++) {
617 bool match
= false, stop
= false;
619 if (data
->partitions
[i
]->partial_replica
&& no_gc_control
!= NULL
) {
620 if (ldb_dn_compare_base(data
->partitions
[i
]->ctrl
->dn
,
621 req
->op
.search
.base
) == 0) {
622 /* base DN is in a partial replica
623 with the NO_GLOBAL_CATALOG
624 control. This partition is invisible */
625 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
631 /* Phantom root: Find all partitions under the
632 * search base. We match if:
634 * 1) the DN we are looking for exactly matches a
635 * certain partition and always stop
636 * 2) the DN we are looking for is a parent of certain
637 * partitions and it isn't a scope base search
638 * 3) the DN we are looking for is a child of a certain
639 * partition and always stop
640 * - we don't need to go any further up in the
643 if (ldb_dn_compare(data
->partitions
[i
]->ctrl
->dn
,
644 req
->op
.search
.base
) == 0) {
649 (ldb_dn_compare_base(req
->op
.search
.base
,
650 data
->partitions
[i
]->ctrl
->dn
) == 0 &&
651 req
->op
.search
.scope
!= LDB_SCOPE_BASE
)) {
655 ldb_dn_compare_base(data
->partitions
[i
]->ctrl
->dn
,
656 req
->op
.search
.base
) == 0) {
658 stop
= true; /* note that this relies on partition ordering */
661 /* Domain scope: Find all partitions under the search
664 * We generate referral candidates if we haven't
665 * specified the domain scope control, haven't a base
666 * search* scope and the DN we are looking for is a real
667 * predecessor of certain partitions. When a new
668 * referral candidate is nearer to the DN than an
669 * existing one delete the latter (we want to have only
670 * the closest ones). When we checked this for all
671 * candidates we have the final referrals.
673 * We match if the DN we are looking for is a child of
674 * a certain partition or the partition
675 * DN itself - we don't need to go any further
676 * up in the hierarchy!
678 if ((!domain_scope
) &&
679 (req
->op
.search
.scope
!= LDB_SCOPE_BASE
) &&
680 (ldb_dn_compare_base(req
->op
.search
.base
,
681 data
->partitions
[i
]->ctrl
->dn
) == 0) &&
682 (ldb_dn_compare(req
->op
.search
.base
,
683 data
->partitions
[i
]->ctrl
->dn
) != 0)) {
684 char *ref
= talloc_asprintf(ac
,
686 lpcfg_dnsdomain(lp_ctx
),
687 ldb_dn_get_linearized(data
->partitions
[i
]->ctrl
->dn
),
688 req
->op
.search
.scope
== LDB_SCOPE_ONELEVEL
? "??base" : "");
694 /* Initialise the referrals list */
695 if (ac
->referrals
== NULL
) {
696 ac
->referrals
= (const char **) str_list_make_empty(ac
);
697 if (ac
->referrals
== NULL
) {
702 /* Check if the new referral candidate is
703 * closer to the base DN than already
704 * saved ones and delete the latters */
706 while (ac
->referrals
[j
] != NULL
) {
707 if (strstr(ac
->referrals
[j
],
708 ldb_dn_get_linearized(data
->partitions
[i
]->ctrl
->dn
)) != NULL
) {
709 str_list_remove(ac
->referrals
,
716 /* Add our new candidate */
717 ac
->referrals
= str_list_add(ac
->referrals
, ref
);
721 if (ac
->referrals
== NULL
) {
725 if (ldb_dn_compare_base(data
->partitions
[i
]->ctrl
->dn
, req
->op
.search
.base
) == 0) {
727 stop
= true; /* note that this relies on partition ordering */
732 ret
= partition_prep_request(ac
, data
->partitions
[i
]);
733 if (ret
!= LDB_SUCCESS
) {
741 /* Perhaps we didn't match any partitions. Try the main partition */
742 if (ac
->num_requests
== 0) {
744 return ldb_next_request(module
, req
);
747 /* fire the first one */
748 return partition_call_first(ac
);
752 static int partition_add(struct ldb_module
*module
, struct ldb_request
*req
)
754 return partition_replicate(module
, req
, req
->op
.add
.message
->dn
);
758 static int partition_modify(struct ldb_module
*module
, struct ldb_request
*req
)
760 return partition_replicate(module
, req
, req
->op
.mod
.message
->dn
);
764 static int partition_delete(struct ldb_module
*module
, struct ldb_request
*req
)
766 return partition_replicate(module
, req
, req
->op
.del
.dn
);
770 static int partition_rename(struct ldb_module
*module
, struct ldb_request
*req
)
773 struct dsdb_partition
*backend
, *backend2
;
775 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
776 struct partition_private_data
);
778 /* Skip the lot if 'data' isn't here yet (initialisation) */
780 return ldb_operr(ldb_module_get_ctx(module
));
783 backend
= find_partition(data
, req
->op
.rename
.olddn
, req
);
784 backend2
= find_partition(data
, req
->op
.rename
.newdn
, req
);
786 if ((backend
&& !backend2
) || (!backend
&& backend2
)) {
787 return LDB_ERR_AFFECTS_MULTIPLE_DSAS
;
790 if (backend
!= backend2
) {
791 ldb_asprintf_errstring(ldb_module_get_ctx(module
),
792 "Cannot rename from %s in %s to %s in %s: %s",
793 ldb_dn_get_linearized(req
->op
.rename
.olddn
),
794 ldb_dn_get_linearized(backend
->ctrl
->dn
),
795 ldb_dn_get_linearized(req
->op
.rename
.newdn
),
796 ldb_dn_get_linearized(backend2
->ctrl
->dn
),
797 ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS
));
798 return LDB_ERR_AFFECTS_MULTIPLE_DSAS
;
801 return partition_replicate(module
, req
, req
->op
.rename
.olddn
);
804 /* start a transaction */
805 static int partition_start_trans(struct ldb_module
*module
)
809 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
810 struct partition_private_data
);
811 /* Look at base DN */
812 /* Figure out which partition it is under */
813 /* Skip the lot if 'data' isn't here yet (initialization) */
814 if ((module
&& ldb_module_flags(ldb_module_get_ctx(module
)) & LDB_FLG_ENABLE_TRACING
)) {
815 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_start_trans() -> (metadata partition)");
817 ret
= ldb_next_start_trans(module
);
818 if (ret
!= LDB_SUCCESS
) {
822 ret
= partition_reload_if_required(module
, data
, NULL
);
823 if (ret
!= LDB_SUCCESS
) {
824 ldb_next_del_trans(module
);
828 ret
= partition_metadata_start_trans(module
);
829 if (ret
!= LDB_SUCCESS
) {
830 ldb_next_del_trans(module
);
834 for (i
=0; data
&& data
->partitions
&& data
->partitions
[i
]; i
++) {
835 if ((module
&& ldb_module_flags(ldb_module_get_ctx(module
)) & LDB_FLG_ENABLE_TRACING
)) {
836 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_start_trans() -> %s",
837 ldb_dn_get_linearized(data
->partitions
[i
]->ctrl
->dn
));
839 ret
= ldb_next_start_trans(data
->partitions
[i
]->module
);
840 if (ret
!= LDB_SUCCESS
) {
841 /* Back it out, if it fails on one */
842 for (i
--; i
>= 0; i
--) {
843 ldb_next_del_trans(data
->partitions
[i
]->module
);
845 ldb_next_del_trans(module
);
846 partition_metadata_del_trans(module
);
851 data
->in_transaction
++;
856 /* prepare for a commit */
857 static int partition_prepare_commit(struct ldb_module
*module
)
860 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
861 struct partition_private_data
);
864 for (i
=0; data
&& data
->partitions
&& data
->partitions
[i
]; i
++) {
865 if ((module
&& ldb_module_flags(ldb_module_get_ctx(module
)) & LDB_FLG_ENABLE_TRACING
)) {
866 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_prepare_commit() -> %s",
867 ldb_dn_get_linearized(data
->partitions
[i
]->ctrl
->dn
));
869 ret
= ldb_next_prepare_commit(data
->partitions
[i
]->module
);
870 if (ret
!= LDB_SUCCESS
) {
871 ldb_asprintf_errstring(ldb_module_get_ctx(module
), "prepare_commit error on %s: %s",
872 ldb_dn_get_linearized(data
->partitions
[i
]->ctrl
->dn
),
873 ldb_errstring(ldb_module_get_ctx(module
)));
878 if ((module
&& ldb_module_flags(ldb_module_get_ctx(module
)) & LDB_FLG_ENABLE_TRACING
)) {
879 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_prepare_commit() -> (metadata partition)");
882 ret
= ldb_next_prepare_commit(module
);
883 if (ret
!= LDB_SUCCESS
) {
887 /* metadata prepare commit must come last, as other partitions could modify
888 * the database inside the prepare commit method of a module */
889 return partition_metadata_prepare_commit(module
);
893 /* end a transaction */
894 static int partition_end_trans(struct ldb_module
*module
)
898 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
899 struct partition_private_data
);
903 if (data
->in_transaction
== 0) {
904 DEBUG(0,("partition end transaction mismatch\n"));
905 ret
= LDB_ERR_OPERATIONS_ERROR
;
907 data
->in_transaction
--;
910 ret2
= partition_metadata_end_trans(module
);
911 if (ret2
!= LDB_SUCCESS
) {
915 for (i
=0; data
&& data
->partitions
&& data
->partitions
[i
]; i
++) {
916 if ((module
&& ldb_module_flags(ldb_module_get_ctx(module
)) & LDB_FLG_ENABLE_TRACING
)) {
917 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_end_trans() -> %s",
918 ldb_dn_get_linearized(data
->partitions
[i
]->ctrl
->dn
));
920 ret2
= ldb_next_end_trans(data
->partitions
[i
]->module
);
921 if (ret2
!= LDB_SUCCESS
) {
922 ldb_asprintf_errstring(ldb_module_get_ctx(module
), "end_trans error on %s: %s",
923 ldb_dn_get_linearized(data
->partitions
[i
]->ctrl
->dn
),
924 ldb_errstring(ldb_module_get_ctx(module
)));
929 if ((module
&& ldb_module_flags(ldb_module_get_ctx(module
)) & LDB_FLG_ENABLE_TRACING
)) {
930 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_end_trans() -> (metadata partition)");
932 ret2
= ldb_next_end_trans(module
);
933 if (ret2
!= LDB_SUCCESS
) {
939 /* delete a transaction */
940 static int partition_del_trans(struct ldb_module
*module
)
942 int ret
, final_ret
= LDB_SUCCESS
;
944 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
945 struct partition_private_data
);
946 ret
= partition_metadata_del_trans(module
);
947 if (ret
!= LDB_SUCCESS
) {
951 for (i
=0; data
&& data
->partitions
&& data
->partitions
[i
]; i
++) {
952 if ((module
&& ldb_module_flags(ldb_module_get_ctx(module
)) & LDB_FLG_ENABLE_TRACING
)) {
953 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_del_trans() -> %s",
954 ldb_dn_get_linearized(data
->partitions
[i
]->ctrl
->dn
));
956 ret
= ldb_next_del_trans(data
->partitions
[i
]->module
);
957 if (ret
!= LDB_SUCCESS
) {
958 ldb_asprintf_errstring(ldb_module_get_ctx(module
), "del_trans error on %s: %s",
959 ldb_dn_get_linearized(data
->partitions
[i
]->ctrl
->dn
),
960 ldb_errstring(ldb_module_get_ctx(module
)));
965 if (data
->in_transaction
== 0) {
966 DEBUG(0,("partition del transaction mismatch\n"));
967 return ldb_operr(ldb_module_get_ctx(module
));
969 data
->in_transaction
--;
971 if ((module
&& ldb_module_flags(ldb_module_get_ctx(module
)) & LDB_FLG_ENABLE_TRACING
)) {
972 ldb_debug(ldb_module_get_ctx(module
), LDB_DEBUG_TRACE
, "partition_del_trans() -> (metadata partition)");
974 ret
= ldb_next_del_trans(module
);
975 if (ret
!= LDB_SUCCESS
) {
981 int partition_primary_sequence_number(struct ldb_module
*module
, TALLOC_CTX
*mem_ctx
,
982 uint64_t *seq_number
,
983 struct ldb_request
*parent
)
986 struct ldb_result
*res
;
987 struct ldb_seqnum_request
*tseq
;
988 struct ldb_seqnum_result
*seqr
;
990 tseq
= talloc_zero(mem_ctx
, struct ldb_seqnum_request
);
992 return ldb_oom(ldb_module_get_ctx(module
));
994 tseq
->type
= LDB_SEQ_HIGHEST_SEQ
;
996 ret
= dsdb_module_extended(module
, tseq
, &res
,
997 LDB_EXTENDED_SEQUENCE_NUMBER
,
999 DSDB_FLAG_NEXT_MODULE
,
1001 if (ret
!= LDB_SUCCESS
) {
1006 seqr
= talloc_get_type_abort(res
->extended
->data
,
1007 struct ldb_seqnum_result
);
1008 if (seqr
->flags
& LDB_SEQ_TIMESTAMP_SEQUENCE
) {
1010 return ldb_module_error(module
, LDB_ERR_OPERATIONS_ERROR
,
1011 "Primary backend in partition module returned a timestamp based seq");
1014 *seq_number
= seqr
->seq_num
;
1021 * Older version of sequence number as sum of sequence numbers for each partition
1023 int partition_sequence_number_from_partitions(struct ldb_module
*module
,
1028 uint64_t seq_number
= 0;
1029 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
1030 struct partition_private_data
);
1032 ret
= partition_primary_sequence_number(module
, data
, &seq_number
, NULL
);
1033 if (ret
!= LDB_SUCCESS
) {
1037 /* Skip the lot if 'data' isn't here yet (initialisation) */
1038 for (i
=0; data
&& data
->partitions
&& data
->partitions
[i
]; i
++) {
1039 struct ldb_seqnum_request
*tseq
;
1040 struct ldb_seqnum_result
*tseqr
;
1041 struct ldb_request
*treq
;
1042 struct ldb_result
*res
= talloc_zero(data
, struct ldb_result
);
1044 return ldb_oom(ldb_module_get_ctx(module
));
1046 tseq
= talloc_zero(res
, struct ldb_seqnum_request
);
1049 return ldb_oom(ldb_module_get_ctx(module
));
1051 tseq
->type
= LDB_SEQ_HIGHEST_SEQ
;
1053 ret
= ldb_build_extended_req(&treq
, ldb_module_get_ctx(module
), res
,
1054 LDB_EXTENDED_SEQUENCE_NUMBER
,
1058 ldb_extended_default_callback
,
1060 LDB_REQ_SET_LOCATION(treq
);
1061 if (ret
!= LDB_SUCCESS
) {
1066 ret
= partition_request(data
->partitions
[i
]->module
, treq
);
1067 if (ret
!= LDB_SUCCESS
) {
1071 ret
= ldb_wait(treq
->handle
, LDB_WAIT_ALL
);
1072 if (ret
!= LDB_SUCCESS
) {
1076 tseqr
= talloc_get_type(res
->extended
->data
,
1077 struct ldb_seqnum_result
);
1078 seq_number
+= tseqr
->seq_num
;
1088 * Newer version of sequence number using metadata tdb
1090 static int partition_sequence_number(struct ldb_module
*module
, struct ldb_request
*req
)
1092 struct ldb_extended
*ext
;
1093 struct ldb_seqnum_request
*seq
;
1094 struct ldb_seqnum_result
*seqr
;
1095 uint64_t seq_number
;
1098 seq
= talloc_get_type_abort(req
->op
.extended
.data
, struct ldb_seqnum_request
);
1099 switch (seq
->type
) {
1101 ret
= partition_metadata_sequence_number_increment(module
, &seq_number
);
1102 if (ret
!= LDB_SUCCESS
) {
1107 case LDB_SEQ_HIGHEST_SEQ
:
1108 ret
= partition_metadata_sequence_number(module
, &seq_number
);
1109 if (ret
!= LDB_SUCCESS
) {
1114 case LDB_SEQ_HIGHEST_TIMESTAMP
:
1115 return ldb_module_error(module
, LDB_ERR_OPERATIONS_ERROR
,
1116 "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1119 ext
= talloc_zero(req
, struct ldb_extended
);
1121 return ldb_module_oom(module
);
1123 seqr
= talloc_zero(ext
, struct ldb_seqnum_result
);
1126 return ldb_module_oom(module
);
1128 ext
->oid
= LDB_EXTENDED_SEQUENCE_NUMBER
;
1131 seqr
->seq_num
= seq_number
;
1132 seqr
->flags
|= LDB_SEQ_GLOBAL_SEQUENCE
;
1134 /* send request done */
1135 return ldb_module_done(req
, NULL
, ext
, LDB_SUCCESS
);
1139 static int partition_extended(struct ldb_module
*module
, struct ldb_request
*req
)
1141 struct partition_private_data
*data
= talloc_get_type(ldb_module_get_private(module
),
1142 struct partition_private_data
);
1143 struct partition_context
*ac
;
1146 /* if we aren't initialised yet go further */
1148 return ldb_next_request(module
, req
);
1151 if (strcmp(req
->op
.extended
.oid
, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID
) == 0) {
1152 /* Update the metadata.tdb to increment the schema version if needed*/
1153 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1154 ret
= partition_metadata_inc_schema_sequence(module
);
1155 return ldb_module_done(req
, NULL
, NULL
, ret
);
1158 if (strcmp(req
->op
.extended
.oid
, LDB_EXTENDED_SEQUENCE_NUMBER
) == 0) {
1159 return partition_sequence_number(module
, req
);
1162 if (strcmp(req
->op
.extended
.oid
, DSDB_EXTENDED_CREATE_PARTITION_OID
) == 0) {
1163 return partition_create(module
, req
);
1167 * as the extended operation has no dn
1168 * we need to send it to all partitions
1171 ac
= partition_init_ctx(module
, req
);
1173 return ldb_operr(ldb_module_get_ctx(module
));
1176 return partition_send_all(module
, ac
, req
);
1179 static const struct ldb_module_ops ldb_partition_module_ops
= {
1180 .name
= "partition",
1181 .init_context
= partition_init
,
1182 .search
= partition_search
,
1183 .add
= partition_add
,
1184 .modify
= partition_modify
,
1185 .del
= partition_delete
,
1186 .rename
= partition_rename
,
1187 .extended
= partition_extended
,
1188 .start_transaction
= partition_start_trans
,
1189 .prepare_commit
= partition_prepare_commit
,
1190 .end_transaction
= partition_end_trans
,
1191 .del_transaction
= partition_del_trans
,
1194 int ldb_partition_module_init(const char *version
)
1196 LDB_MODULE_CHECK_VERSION(version
);
1197 return ldb_register_module(&ldb_partition_module_ops
);