use unsigned instead of uint32_t for LDB counters.
[Samba/cd1.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
blob828dca1f2cbef60f08c9efbc45c4e0c173fde956
1 /*
2 ldb database library
4 Copyright (C) Andrew Tridgell 2004-2009
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 * Name: ldb
27 * Component: ldb tdb backend - indexing
29 * Description: indexing routines for ldb tdb backend
31 * Author: Andrew Tridgell
34 #include "ldb_tdb.h"
36 struct dn_list {
37 unsigned int count;
38 struct ldb_val *dn;
41 struct ltdb_idxptr {
42 struct tdb_context *itdb;
43 int error;
46 /* we put a @IDXVERSION attribute on index entries. This
47 allows us to tell if it was written by an older version
49 #define LTDB_INDEXING_VERSION 2
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
54 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55 ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56 return LDB_SUCCESS;
59 /* compare two DN entries in a dn_list. Take account of possible
60 * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 if (v1->length > v2->length && v1->data[v2->length] != 0) {
64 return -1;
66 if (v1->length < v2->length && v2->data[v1->length] != 0) {
67 return 1;
69 return strncmp((char *)v1->data, (char *)v2->data, v1->length);
74 find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75 comparison with the dn returns -1 if not found
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
79 unsigned int i;
80 for (i=0; i<list->count; i++) {
81 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
83 return -1;
87 find a entry in a dn_list. Uses a case sensitive comparison with the dn
88 returns -1 if not found
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
92 struct ldb_val v;
93 v.data = discard_const_p(unsigned char, dn);
94 v.length = strlen(dn);
95 return ltdb_dn_list_find_val(list, &v);
98 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
100 struct dn_list *list;
101 if (rec.dsize != sizeof(void *)) {
102 ldb_asprintf_errstring(ldb_module_get_ctx(module),
103 "Bad data size for idxptr %u", (unsigned)rec.dsize);
104 return NULL;
106 memcpy(&list, rec.dptr, sizeof(void *));
107 list = talloc_get_type(list, struct dn_list);
108 if (list == NULL) {
109 ldb_asprintf_errstring(ldb_module_get_ctx(module),
110 "Bad type '%s' for idxptr",
111 talloc_get_name(list));
112 return NULL;
114 if (check_parent && list->dn && talloc_parent(list->dn) != list) {
115 ldb_asprintf_errstring(ldb_module_get_ctx(module),
116 "Bad parent '%s' for idxptr",
117 talloc_get_name(talloc_parent(list->dn)));
118 return NULL;
120 return list;
124 return the @IDX list in an index entry for a dn as a
125 struct dn_list
127 static int ltdb_dn_list_load(struct ldb_module *module,
128 struct ldb_dn *dn, struct dn_list *list)
130 struct ldb_message *msg;
131 int ret;
132 struct ldb_message_element *el;
133 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
134 TDB_DATA rec;
135 struct dn_list *list2;
136 TDB_DATA key;
138 list->dn = NULL;
139 list->count = 0;
141 /* see if we have any in-memory index entries */
142 if (ltdb->idxptr == NULL ||
143 ltdb->idxptr->itdb == NULL) {
144 goto normal_index;
147 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
148 key.dsize = strlen((char *)key.dptr);
150 rec = tdb_fetch(ltdb->idxptr->itdb, key);
151 if (rec.dptr == NULL) {
152 goto normal_index;
155 /* we've found an in-memory index entry */
156 list2 = ltdb_index_idxptr(module, rec, true);
157 if (list2 == NULL) {
158 free(rec.dptr);
159 return LDB_ERR_OPERATIONS_ERROR;
161 free(rec.dptr);
163 *list = *list2;
164 return LDB_SUCCESS;
166 normal_index:
167 msg = ldb_msg_new(list);
168 if (msg == NULL) {
169 return LDB_ERR_OPERATIONS_ERROR;
172 ret = ltdb_search_dn1(module, dn, msg);
173 if (ret != LDB_SUCCESS) {
174 return ret;
177 /* TODO: check indexing version number */
179 el = ldb_msg_find_element(msg, LTDB_IDX);
180 if (!el) {
181 talloc_free(msg);
182 return LDB_SUCCESS;
185 /* we avoid copying the strings by stealing the list */
186 list->dn = talloc_steal(list, el->values);
187 list->count = el->num_values;
189 return LDB_SUCCESS;
194 save a dn_list into a full @IDX style record
196 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
197 struct dn_list *list)
199 struct ldb_message *msg;
200 int ret;
202 if (list->count == 0) {
203 ret = ltdb_delete_noindex(module, dn);
204 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
205 return LDB_SUCCESS;
207 return ret;
210 msg = ldb_msg_new(module);
211 if (!msg) {
212 ldb_module_oom(module);
213 return LDB_ERR_OPERATIONS_ERROR;
216 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
217 if (ret != LDB_SUCCESS) {
218 talloc_free(msg);
219 ldb_module_oom(module);
220 return LDB_ERR_OPERATIONS_ERROR;
223 msg->dn = dn;
224 if (list->count > 0) {
225 struct ldb_message_element *el;
227 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
228 if (ret != LDB_SUCCESS) {
229 ldb_module_oom(module);
230 talloc_free(msg);
231 return LDB_ERR_OPERATIONS_ERROR;
233 el->values = list->dn;
234 el->num_values = list->count;
237 ret = ltdb_store(module, msg, TDB_REPLACE);
238 talloc_free(msg);
239 return ret;
243 save a dn_list into the database, in either @IDX or internal format
245 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
246 struct dn_list *list)
248 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
249 TDB_DATA rec, key;
250 int ret;
251 struct dn_list *list2;
253 if (ltdb->idxptr == NULL) {
254 return ltdb_dn_list_store_full(module, dn, list);
257 if (ltdb->idxptr->itdb == NULL) {
258 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
259 if (ltdb->idxptr->itdb == NULL) {
260 return LDB_ERR_OPERATIONS_ERROR;
264 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
265 key.dsize = strlen((char *)key.dptr);
267 rec = tdb_fetch(ltdb->idxptr->itdb, key);
268 if (rec.dptr != NULL) {
269 list2 = ltdb_index_idxptr(module, rec, false);
270 if (list2 == NULL) {
271 free(rec.dptr);
272 return LDB_ERR_OPERATIONS_ERROR;
274 free(rec.dptr);
275 list2->dn = talloc_steal(list2, list->dn);
276 list2->count = list->count;
277 return LDB_SUCCESS;
280 list2 = talloc(ltdb->idxptr, struct dn_list);
281 if (list2 == NULL) {
282 return LDB_ERR_OPERATIONS_ERROR;
284 list2->dn = talloc_steal(list2, list->dn);
285 list2->count = list->count;
287 rec.dptr = (uint8_t *)&list2;
288 rec.dsize = sizeof(void *);
290 ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
291 if (ret == -1) {
292 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
294 return LDB_SUCCESS;
298 traverse function for storing the in-memory index entries on disk
300 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
302 struct ldb_module *module = state;
303 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
304 struct ldb_dn *dn;
305 struct ldb_context *ldb = ldb_module_get_ctx(module);
306 struct ldb_val v;
307 struct dn_list *list;
309 list = ltdb_index_idxptr(module, data, true);
310 if (list == NULL) {
311 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
312 return -1;
315 v.data = key.dptr;
316 v.length = strnlen((char *)key.dptr, key.dsize);
318 dn = ldb_dn_from_ldb_val(module, ldb, &v);
319 if (dn == NULL) {
320 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
321 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
322 return -1;
325 ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
326 talloc_free(dn);
327 if (ltdb->idxptr->error != 0) {
328 return -1;
330 return 0;
333 /* cleanup the idxptr mode when transaction commits */
334 int ltdb_index_transaction_commit(struct ldb_module *module)
336 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
337 int ret;
339 struct ldb_context *ldb = ldb_module_get_ctx(module);
341 ldb_reset_err_string(ldb);
342 if (ltdb->idxptr->itdb) {
343 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
344 tdb_close(ltdb->idxptr->itdb);
347 ret = ltdb->idxptr->error;
349 if (ret != LDB_SUCCESS) {
350 if (!ldb_errstring(ldb)) {
351 ldb_set_errstring(ldb, ldb_strerror(ret));
353 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
356 talloc_free(ltdb->idxptr);
357 ltdb->idxptr = NULL;
358 return ret;
361 /* cleanup the idxptr mode when transaction cancels */
362 int ltdb_index_transaction_cancel(struct ldb_module *module)
364 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
365 if (ltdb->idxptr && ltdb->idxptr->itdb) {
366 tdb_close(ltdb->idxptr->itdb);
368 talloc_free(ltdb->idxptr);
369 ltdb->idxptr = NULL;
370 return LDB_SUCCESS;
375 return the dn key to be used for an index
376 the caller is responsible for freeing
378 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
379 const char *attr, const struct ldb_val *value,
380 const struct ldb_schema_attribute **ap)
382 struct ldb_dn *ret;
383 struct ldb_val v;
384 const struct ldb_schema_attribute *a;
385 char *attr_folded;
386 int r;
388 attr_folded = ldb_attr_casefold(ldb, attr);
389 if (!attr_folded) {
390 return NULL;
393 a = ldb_schema_attribute_by_name(ldb, attr);
394 if (ap) {
395 *ap = a;
397 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
398 if (r != LDB_SUCCESS) {
399 const char *errstr = ldb_errstring(ldb);
400 /* canonicalisation can be refused. For example,
401 a attribute that takes wildcards will refuse to canonicalise
402 if the value contains a wildcard */
403 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
404 attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
405 talloc_free(attr_folded);
406 return NULL;
408 if (ldb_should_b64_encode(ldb, &v)) {
409 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
410 if (!vstr) return NULL;
411 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
412 talloc_free(vstr);
413 } else {
414 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
417 if (v.data != value->data) {
418 talloc_free(v.data);
420 talloc_free(attr_folded);
422 return ret;
426 see if a attribute value is in the list of indexed attributes
428 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
430 unsigned int i;
431 struct ldb_message_element *el;
433 el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
434 if (el == NULL) {
435 return false;
438 /* TODO: this is too expensive! At least use a binary search */
439 for (i=0; i<el->num_values; i++) {
440 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
441 return true;
444 return false;
448 in the following logic functions, the return value is treated as
449 follows:
451 LDB_SUCCESS: we found some matching index values
453 LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
455 LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
456 we'll need a full search
460 return a list of dn's that might match a simple indexed search (an
461 equality search only)
463 static int ltdb_index_dn_simple(struct ldb_module *module,
464 const struct ldb_parse_tree *tree,
465 const struct ldb_message *index_list,
466 struct dn_list *list)
468 struct ldb_context *ldb;
469 struct ldb_dn *dn;
470 int ret;
472 ldb = ldb_module_get_ctx(module);
474 list->count = 0;
475 list->dn = NULL;
477 /* if the attribute isn't in the list of indexed attributes then
478 this node needs a full search */
479 if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
480 return LDB_ERR_OPERATIONS_ERROR;
483 /* the attribute is indexed. Pull the list of DNs that match the
484 search criterion */
485 dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
486 if (!dn) return LDB_ERR_OPERATIONS_ERROR;
488 ret = ltdb_dn_list_load(module, dn, list);
489 talloc_free(dn);
490 return ret;
494 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
497 return a list of dn's that might match a leaf indexed search
499 static int ltdb_index_dn_leaf(struct ldb_module *module,
500 const struct ldb_parse_tree *tree,
501 const struct ldb_message *index_list,
502 struct dn_list *list)
504 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
505 list->dn = talloc_array(list, struct ldb_val, 1);
506 if (list->dn == NULL) {
507 ldb_module_oom(module);
508 return LDB_ERR_OPERATIONS_ERROR;
510 list->dn[0] = tree->u.equality.value;
511 list->count = 1;
512 return LDB_SUCCESS;
514 return ltdb_index_dn_simple(module, tree, index_list, list);
519 list intersection
520 list = list & list2
522 static bool list_intersect(struct ldb_context *ldb,
523 struct dn_list *list, const struct dn_list *list2)
525 struct dn_list *list3;
526 unsigned int i;
528 if (list->count == 0) {
529 /* 0 & X == 0 */
530 return true;
532 if (list2->count == 0) {
533 /* X & 0 == 0 */
534 list->count = 0;
535 list->dn = NULL;
536 return true;
539 /* the indexing code is allowed to return a longer list than
540 what really matches, as all results are filtered by the
541 full expression at the end - this shortcut avoids a lot of
542 work in some cases */
543 if (list->count < 2 && list2->count > 10) {
544 return true;
546 if (list2->count < 2 && list->count > 10) {
547 list->count = list2->count;
548 list->dn = list2->dn;
549 /* note that list2 may not be the parent of list2->dn,
550 as list2->dn may be owned by ltdb->idxptr. In that
551 case we expect this reparent call to fail, which is
552 OK */
553 talloc_reparent(list2, list, list2->dn);
554 return true;
557 list3 = talloc_zero(list, struct dn_list);
558 if (list3 == NULL) {
559 return false;
562 list3->dn = talloc_array(list3, struct ldb_val, list->count);
563 if (!list3->dn) {
564 talloc_free(list3);
565 return false;
567 list3->count = 0;
569 for (i=0;i<list->count;i++) {
570 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
571 list3->dn[list3->count] = list->dn[i];
572 list3->count++;
576 list->dn = talloc_steal(list, list3->dn);
577 list->count = list3->count;
578 talloc_free(list3);
580 return true;
585 list union
586 list = list | list2
588 static bool list_union(struct ldb_context *ldb,
589 struct dn_list *list, const struct dn_list *list2)
591 struct ldb_val *dn3;
593 if (list2->count == 0) {
594 /* X | 0 == X */
595 return true;
598 if (list->count == 0) {
599 /* 0 | X == X */
600 list->count = list2->count;
601 list->dn = list2->dn;
602 /* note that list2 may not be the parent of list2->dn,
603 as list2->dn may be owned by ltdb->idxptr. In that
604 case we expect this reparent call to fail, which is
605 OK */
606 talloc_reparent(list2, list, list2->dn);
607 return true;
610 dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
611 if (!dn3) {
612 ldb_oom(ldb);
613 return false;
616 /* we allow for duplicates here, and get rid of them later */
617 memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
618 memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
620 list->dn = dn3;
621 list->count += list2->count;
623 return true;
626 static int ltdb_index_dn(struct ldb_module *module,
627 const struct ldb_parse_tree *tree,
628 const struct ldb_message *index_list,
629 struct dn_list *list);
633 process an OR list (a union)
635 static int ltdb_index_dn_or(struct ldb_module *module,
636 const struct ldb_parse_tree *tree,
637 const struct ldb_message *index_list,
638 struct dn_list *list)
640 struct ldb_context *ldb;
641 unsigned int i;
643 ldb = ldb_module_get_ctx(module);
645 list->dn = NULL;
646 list->count = 0;
648 for (i=0; i<tree->u.list.num_elements; i++) {
649 struct dn_list *list2;
650 int ret;
652 list2 = talloc_zero(list, struct dn_list);
653 if (list2 == NULL) {
654 return LDB_ERR_OPERATIONS_ERROR;
657 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
659 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
660 /* X || 0 == X */
661 talloc_free(list2);
662 continue;
665 if (ret != LDB_SUCCESS) {
666 /* X || * == * */
667 talloc_free(list2);
668 return ret;
671 if (!list_union(ldb, list, list2)) {
672 talloc_free(list2);
673 return LDB_ERR_OPERATIONS_ERROR;
677 if (list->count == 0) {
678 return LDB_ERR_NO_SUCH_OBJECT;
681 return LDB_SUCCESS;
686 NOT an index results
688 static int ltdb_index_dn_not(struct ldb_module *module,
689 const struct ldb_parse_tree *tree,
690 const struct ldb_message *index_list,
691 struct dn_list *list)
693 /* the only way to do an indexed not would be if we could
694 negate the not via another not or if we knew the total
695 number of database elements so we could know that the
696 existing expression covered the whole database.
698 instead, we just give up, and rely on a full index scan
699 (unless an outer & manages to reduce the list)
701 return LDB_ERR_OPERATIONS_ERROR;
705 static bool ltdb_index_unique(struct ldb_context *ldb,
706 const char *attr)
708 const struct ldb_schema_attribute *a;
709 a = ldb_schema_attribute_by_name(ldb, attr);
710 if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
711 return true;
713 return false;
717 process an AND expression (intersection)
719 static int ltdb_index_dn_and(struct ldb_module *module,
720 const struct ldb_parse_tree *tree,
721 const struct ldb_message *index_list,
722 struct dn_list *list)
724 struct ldb_context *ldb;
725 unsigned int i;
726 bool found;
728 ldb = ldb_module_get_ctx(module);
730 list->dn = NULL;
731 list->count = 0;
733 /* in the first pass we only look for unique simple
734 equality tests, in the hope of avoiding having to look
735 at any others */
736 for (i=0; i<tree->u.list.num_elements; i++) {
737 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
738 int ret;
740 if (subtree->operation != LDB_OP_EQUALITY ||
741 !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
742 continue;
745 ret = ltdb_index_dn(module, subtree, index_list, list);
746 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
747 /* 0 && X == 0 */
748 return LDB_ERR_NO_SUCH_OBJECT;
750 if (ret == LDB_SUCCESS) {
751 /* a unique index match means we can
752 * stop. Note that we don't care if we return
753 * a few too many objects, due to later
754 * filtering */
755 return LDB_SUCCESS;
759 /* now do a full intersection */
760 found = false;
762 for (i=0; i<tree->u.list.num_elements; i++) {
763 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
764 struct dn_list *list2;
765 int ret;
767 list2 = talloc_zero(list, struct dn_list);
768 if (list2 == NULL) {
769 ldb_module_oom(module);
770 return LDB_ERR_OPERATIONS_ERROR;
773 ret = ltdb_index_dn(module, subtree, index_list, list2);
775 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
776 /* X && 0 == 0 */
777 list->dn = NULL;
778 list->count = 0;
779 talloc_free(list2);
780 return LDB_ERR_NO_SUCH_OBJECT;
783 if (ret != LDB_SUCCESS) {
784 /* this didn't adding anything */
785 talloc_free(list2);
786 continue;
789 if (!found) {
790 talloc_reparent(list2, list, list->dn);
791 list->dn = list2->dn;
792 list->count = list2->count;
793 found = true;
794 } else if (!list_intersect(ldb, list, list2)) {
795 talloc_free(list2);
796 return LDB_ERR_OPERATIONS_ERROR;
799 if (list->count == 0) {
800 list->dn = NULL;
801 return LDB_ERR_NO_SUCH_OBJECT;
804 if (list->count < 2) {
805 /* it isn't worth loading the next part of the tree */
806 return LDB_SUCCESS;
810 if (!found) {
811 /* none of the attributes were indexed */
812 return LDB_ERR_OPERATIONS_ERROR;
815 return LDB_SUCCESS;
819 return a list of matching objects using a one-level index
821 static int ltdb_index_dn_one(struct ldb_module *module,
822 struct ldb_dn *parent_dn,
823 struct dn_list *list)
825 struct ldb_context *ldb;
826 struct ldb_dn *key;
827 struct ldb_val val;
828 int ret;
830 ldb = ldb_module_get_ctx(module);
832 /* work out the index key from the parent DN */
833 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
834 val.length = strlen((char *)val.data);
835 key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
836 if (!key) {
837 ldb_oom(ldb);
838 return LDB_ERR_OPERATIONS_ERROR;
841 ret = ltdb_dn_list_load(module, key, list);
842 talloc_free(key);
843 if (ret != LDB_SUCCESS) {
844 return ret;
847 if (list->count == 0) {
848 return LDB_ERR_NO_SUCH_OBJECT;
851 return LDB_SUCCESS;
855 return a list of dn's that might match a indexed search or
856 an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
858 static int ltdb_index_dn(struct ldb_module *module,
859 const struct ldb_parse_tree *tree,
860 const struct ldb_message *index_list,
861 struct dn_list *list)
863 int ret = LDB_ERR_OPERATIONS_ERROR;
865 switch (tree->operation) {
866 case LDB_OP_AND:
867 ret = ltdb_index_dn_and(module, tree, index_list, list);
868 break;
870 case LDB_OP_OR:
871 ret = ltdb_index_dn_or(module, tree, index_list, list);
872 break;
874 case LDB_OP_NOT:
875 ret = ltdb_index_dn_not(module, tree, index_list, list);
876 break;
878 case LDB_OP_EQUALITY:
879 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
880 break;
882 case LDB_OP_SUBSTRING:
883 case LDB_OP_GREATER:
884 case LDB_OP_LESS:
885 case LDB_OP_PRESENT:
886 case LDB_OP_APPROX:
887 case LDB_OP_EXTENDED:
888 /* we can't index with fancy bitops yet */
889 ret = LDB_ERR_OPERATIONS_ERROR;
890 break;
893 return ret;
897 filter a candidate dn_list from an indexed search into a set of results
898 extracting just the given attributes
900 static int ltdb_index_filter(const struct dn_list *dn_list,
901 struct ltdb_context *ac,
902 uint32_t *match_count)
904 struct ldb_context *ldb;
905 struct ldb_message *msg;
906 unsigned int i;
908 ldb = ldb_module_get_ctx(ac->module);
910 for (i = 0; i < dn_list->count; i++) {
911 struct ldb_dn *dn;
912 int ret;
914 msg = ldb_msg_new(ac);
915 if (!msg) {
916 return LDB_ERR_OPERATIONS_ERROR;
919 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
920 if (dn == NULL) {
921 talloc_free(msg);
922 return LDB_ERR_OPERATIONS_ERROR;
925 ret = ltdb_search_dn1(ac->module, dn, msg);
926 talloc_free(dn);
927 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
928 /* the record has disappeared? yes, this can happen */
929 talloc_free(msg);
930 continue;
933 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
934 /* an internal error */
935 talloc_free(msg);
936 return LDB_ERR_OPERATIONS_ERROR;
939 if (!ldb_match_msg(ldb, msg,
940 ac->tree, ac->base, ac->scope)) {
941 talloc_free(msg);
942 continue;
945 /* filter the attributes that the user wants */
946 ret = ltdb_filter_attrs(msg, ac->attrs);
948 if (ret == -1) {
949 talloc_free(msg);
950 return LDB_ERR_OPERATIONS_ERROR;
953 ret = ldb_module_send_entry(ac->req, msg, NULL);
954 if (ret != LDB_SUCCESS) {
955 ac->request_terminated = true;
956 return ret;
959 (*match_count)++;
962 return LDB_SUCCESS;
966 remove any duplicated entries in a indexed result
968 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
970 unsigned int i, new_count;
972 if (list->count < 2) {
973 return;
976 TYPESAFE_QSORT(list->dn, list->count, dn_list_cmp);
978 new_count = 1;
979 for (i=1; i<list->count; i++) {
980 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
981 if (new_count != i) {
982 list->dn[new_count] = list->dn[i];
984 new_count++;
988 list->count = new_count;
992 search the database with a LDAP-like expression using indexes
993 returns -1 if an indexed search is not possible, in which
994 case the caller should call ltdb_search_full()
996 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
998 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
999 struct dn_list *dn_list;
1000 int ret;
1002 /* see if indexing is enabled */
1003 if (!ltdb->cache->attribute_indexes &&
1004 !ltdb->cache->one_level_indexes &&
1005 ac->scope != LDB_SCOPE_BASE) {
1006 /* fallback to a full search */
1007 return LDB_ERR_OPERATIONS_ERROR;
1010 dn_list = talloc_zero(ac, struct dn_list);
1011 if (dn_list == NULL) {
1012 ldb_module_oom(ac->module);
1013 return LDB_ERR_OPERATIONS_ERROR;
1016 switch (ac->scope) {
1017 case LDB_SCOPE_BASE:
1018 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1019 if (dn_list->dn == NULL) {
1020 ldb_module_oom(ac->module);
1021 talloc_free(dn_list);
1022 return LDB_ERR_OPERATIONS_ERROR;
1024 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1025 if (dn_list->dn[0].data == NULL) {
1026 ldb_module_oom(ac->module);
1027 talloc_free(dn_list);
1028 return LDB_ERR_OPERATIONS_ERROR;
1030 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1031 dn_list->count = 1;
1032 break;
1034 case LDB_SCOPE_ONELEVEL:
1035 if (!ltdb->cache->one_level_indexes) {
1036 talloc_free(dn_list);
1037 return LDB_ERR_OPERATIONS_ERROR;
1039 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1040 if (ret != LDB_SUCCESS) {
1041 talloc_free(dn_list);
1042 return ret;
1044 break;
1046 case LDB_SCOPE_SUBTREE:
1047 case LDB_SCOPE_DEFAULT:
1048 if (!ltdb->cache->attribute_indexes) {
1049 talloc_free(dn_list);
1050 return LDB_ERR_OPERATIONS_ERROR;
1052 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1053 if (ret != LDB_SUCCESS) {
1054 talloc_free(dn_list);
1055 return ret;
1057 ltdb_dn_list_remove_duplicates(dn_list);
1058 break;
1061 ret = ltdb_index_filter(dn_list, ac, match_count);
1062 talloc_free(dn_list);
1063 return ret;
1067 add an index entry for one message element
1069 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1070 struct ldb_message_element *el, int v_idx)
1072 struct ldb_context *ldb;
1073 struct ldb_dn *dn_key;
1074 int ret;
1075 const struct ldb_schema_attribute *a;
1076 struct dn_list *list;
1077 unsigned alloc_len;
1079 ldb = ldb_module_get_ctx(module);
1081 list = talloc_zero(module, struct dn_list);
1082 if (list == NULL) {
1083 return LDB_ERR_OPERATIONS_ERROR;
1086 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1087 if (!dn_key) {
1088 talloc_free(list);
1089 return LDB_ERR_OPERATIONS_ERROR;
1091 talloc_steal(list, dn_key);
1093 ret = ltdb_dn_list_load(module, dn_key, list);
1094 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1095 talloc_free(list);
1096 return ret;
1099 if (ltdb_dn_list_find_str(list, dn) != -1) {
1100 talloc_free(list);
1101 return LDB_SUCCESS;
1104 if (list->count > 0 &&
1105 a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1106 talloc_free(list);
1107 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1108 el->name, dn);
1109 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1112 /* overallocate the list a bit, to reduce the number of
1113 * realloc trigered copies */
1114 alloc_len = ((list->count+1)+7) & ~7;
1115 list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1116 if (list->dn == NULL) {
1117 talloc_free(list);
1118 return LDB_ERR_OPERATIONS_ERROR;
1120 list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1121 list->dn[list->count].length = strlen(dn);
1122 list->count++;
1124 ret = ltdb_dn_list_store(module, dn_key, list);
1126 talloc_free(list);
1128 return ret;
1132 add index entries for one elements in a message
1134 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1135 struct ldb_message_element *el)
1137 unsigned int i;
1138 for (i = 0; i < el->num_values; i++) {
1139 int ret = ltdb_index_add1(module, dn, el, i);
1140 if (ret != LDB_SUCCESS) {
1141 return ret;
1145 return LDB_SUCCESS;
1149 add index entries for all elements in a message
1151 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1152 struct ldb_message_element *elements, int num_el)
1154 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1155 unsigned int i;
1157 if (dn[0] == '@') {
1158 return LDB_SUCCESS;
1161 if (ltdb->cache->indexlist->num_elements == 0) {
1162 /* no indexed fields */
1163 return LDB_SUCCESS;
1166 for (i = 0; i < num_el; i++) {
1167 int ret;
1168 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1169 continue;
1171 ret = ltdb_index_add_el(module, dn, &elements[i]);
1172 if (ret != LDB_SUCCESS) {
1173 struct ldb_context *ldb = ldb_module_get_ctx(module);
1174 ldb_asprintf_errstring(ldb,
1175 __location__ ": Failed to re-index %s in %s - %s",
1176 elements[i].name, dn, ldb_errstring(ldb));
1177 return ret;
1181 return LDB_SUCCESS;
1186 insert a one level index for a message
1188 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1190 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1191 struct ldb_message_element el;
1192 struct ldb_val val;
1193 struct ldb_dn *pdn;
1194 const char *dn;
1195 int ret;
1197 /* We index for ONE Level only if requested */
1198 if (!ltdb->cache->one_level_indexes) {
1199 return LDB_SUCCESS;
1202 pdn = ldb_dn_get_parent(module, msg->dn);
1203 if (pdn == NULL) {
1204 return LDB_ERR_OPERATIONS_ERROR;
1207 dn = ldb_dn_get_linearized(msg->dn);
1208 if (dn == NULL) {
1209 talloc_free(pdn);
1210 return LDB_ERR_OPERATIONS_ERROR;
1213 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1214 if (val.data == NULL) {
1215 talloc_free(pdn);
1216 return LDB_ERR_OPERATIONS_ERROR;
1219 val.length = strlen((char *)val.data);
1220 el.name = LTDB_IDXONE;
1221 el.values = &val;
1222 el.num_values = 1;
1224 if (add) {
1225 ret = ltdb_index_add1(module, dn, &el, 0);
1226 } else { /* delete */
1227 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1230 talloc_free(pdn);
1232 return ret;
1236 add the index entries for a new element in a record
1237 The caller guarantees that these element values are not yet indexed
1239 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn,
1240 struct ldb_message_element *el)
1242 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1243 if (ldb_dn_is_special(dn)) {
1244 return LDB_SUCCESS;
1246 if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1247 return LDB_SUCCESS;
1249 return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1253 add the index entries for a new record
1255 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1257 const char *dn;
1258 int ret;
1260 if (ldb_dn_is_special(msg->dn)) {
1261 return LDB_SUCCESS;
1264 dn = ldb_dn_get_linearized(msg->dn);
1265 if (dn == NULL) {
1266 return LDB_ERR_OPERATIONS_ERROR;
1269 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1270 if (ret != LDB_SUCCESS) {
1271 return ret;
1274 return ltdb_index_onelevel(module, msg, 1);
1279 delete an index entry for one message element
1281 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1282 struct ldb_message_element *el, unsigned int v_idx)
1284 struct ldb_context *ldb;
1285 struct ldb_dn *dn_key;
1286 const char *dn_str;
1287 int ret, i;
1288 unsigned int j;
1289 struct dn_list *list;
1291 ldb = ldb_module_get_ctx(module);
1293 dn_str = ldb_dn_get_linearized(dn);
1294 if (dn_str == NULL) {
1295 return LDB_ERR_OPERATIONS_ERROR;
1298 if (dn_str[0] == '@') {
1299 return LDB_SUCCESS;
1302 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1303 if (!dn_key) {
1304 return LDB_ERR_OPERATIONS_ERROR;
1307 list = talloc_zero(dn_key, struct dn_list);
1308 if (list == NULL) {
1309 talloc_free(dn_key);
1310 return LDB_ERR_OPERATIONS_ERROR;
1313 ret = ltdb_dn_list_load(module, dn_key, list);
1314 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1315 /* it wasn't indexed. Did we have an earlier error? If we did then
1316 its gone now */
1317 talloc_free(dn_key);
1318 return LDB_SUCCESS;
1321 if (ret != LDB_SUCCESS) {
1322 talloc_free(dn_key);
1323 return ret;
1326 i = ltdb_dn_list_find_str(list, dn_str);
1327 if (i == -1) {
1328 /* nothing to delete */
1329 talloc_free(dn_key);
1330 return LDB_SUCCESS;
1333 j = (unsigned int) i;
1334 if (j != list->count - 1) {
1335 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1337 list->count--;
1338 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1340 ret = ltdb_dn_list_store(module, dn_key, list);
1342 talloc_free(dn_key);
1344 return ret;
1348 delete the index entries for a element
1349 return -1 on failure
1351 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1352 struct ldb_message_element *el)
1354 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1355 const char *dn_str;
1356 int ret;
1357 unsigned int i;
1359 if (!ltdb->cache->attribute_indexes) {
1360 /* no indexed fields */
1361 return LDB_SUCCESS;
1364 dn_str = ldb_dn_get_linearized(dn);
1365 if (dn_str == NULL) {
1366 return LDB_ERR_OPERATIONS_ERROR;
1369 if (dn_str[0] == '@') {
1370 return LDB_SUCCESS;
1373 if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1374 return LDB_SUCCESS;
1376 for (i = 0; i < el->num_values; i++) {
1377 ret = ltdb_index_del_value(module, dn, el, i);
1378 if (ret != LDB_SUCCESS) {
1379 return ret;
1383 return LDB_SUCCESS;
1387 delete the index entries for a record
1388 return -1 on failure
1390 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1392 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1393 int ret;
1394 unsigned int i;
1396 if (ldb_dn_is_special(msg->dn)) {
1397 return LDB_SUCCESS;
1400 ret = ltdb_index_onelevel(module, msg, 0);
1401 if (ret != LDB_SUCCESS) {
1402 return ret;
1405 if (!ltdb->cache->attribute_indexes) {
1406 /* no indexed fields */
1407 return LDB_SUCCESS;
1410 for (i = 0; i < msg->num_elements; i++) {
1411 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1412 if (ret != LDB_SUCCESS) {
1413 return ret;
1417 return LDB_SUCCESS;
1422 traversal function that deletes all @INDEX records
1424 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1426 struct ldb_module *module = state;
1427 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1428 const char *dnstr = "DN=" LTDB_INDEX ":";
1429 struct dn_list list;
1430 struct ldb_dn *dn;
1431 struct ldb_val v;
1432 int ret;
1434 if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1435 return 0;
1437 /* we need to put a empty list in the internal tdb for this
1438 * index entry */
1439 list.dn = NULL;
1440 list.count = 0;
1441 v.data = key.dptr;
1442 v.length = strnlen((char *)key.dptr, key.dsize);
1444 dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1445 ret = ltdb_dn_list_store(module, dn, &list);
1446 if (ret != LDB_SUCCESS) {
1447 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1448 "Unable to store null index for %s\n",
1449 ldb_dn_get_linearized(dn));
1450 talloc_free(dn);
1451 return -1;
1453 talloc_free(dn);
1454 return 0;
1457 struct ltdb_reindex_context {
1458 struct ldb_module *module;
1459 int error;
1463 traversal function that adds @INDEX records during a re index
1465 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1467 struct ldb_context *ldb;
1468 struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1469 struct ldb_module *module = ctx->module;
1470 struct ldb_message *msg;
1471 const char *dn = NULL;
1472 int ret;
1473 TDB_DATA key2;
1475 ldb = ldb_module_get_ctx(module);
1477 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1478 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1479 return 0;
1482 msg = ldb_msg_new(module);
1483 if (msg == NULL) {
1484 return -1;
1487 ret = ltdb_unpack_data(module, &data, msg);
1488 if (ret != 0) {
1489 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1490 ldb_dn_get_linearized(msg->dn));
1491 talloc_free(msg);
1492 return -1;
1495 /* check if the DN key has changed, perhaps due to the
1496 case insensitivity of an element changing */
1497 key2 = ltdb_key(module, msg->dn);
1498 if (key2.dptr == NULL) {
1499 /* probably a corrupt record ... darn */
1500 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1501 ldb_dn_get_linearized(msg->dn));
1502 talloc_free(msg);
1503 return 0;
1505 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1506 tdb_delete(tdb, key);
1507 tdb_store(tdb, key2, data, 0);
1509 talloc_free(key2.dptr);
1511 if (msg->dn == NULL) {
1512 dn = (char *)key.dptr + 3;
1513 } else {
1514 dn = ldb_dn_get_linearized(msg->dn);
1517 ret = ltdb_index_onelevel(module, msg, 1);
1518 if (ret != LDB_SUCCESS) {
1519 ldb_debug(ldb, LDB_DEBUG_ERROR,
1520 "Adding special ONE LEVEL index failed (%s)!",
1521 ldb_dn_get_linearized(msg->dn));
1522 talloc_free(msg);
1523 return -1;
1526 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1528 if (ret != LDB_SUCCESS) {
1529 ctx->error = ret;
1530 talloc_free(msg);
1531 return -1;
1534 talloc_free(msg);
1536 return 0;
1540 force a complete reindex of the database
1542 int ltdb_reindex(struct ldb_module *module)
1544 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1545 int ret;
1546 struct ltdb_reindex_context ctx;
1548 if (ltdb_cache_reload(module) != 0) {
1549 return LDB_ERR_OPERATIONS_ERROR;
1552 /* first traverse the database deleting any @INDEX records by
1553 * putting NULL entries in the in-memory tdb
1555 ret = tdb_traverse(ltdb->tdb, delete_index, module);
1556 if (ret == -1) {
1557 return LDB_ERR_OPERATIONS_ERROR;
1560 /* if we don't have indexes we have nothing todo */
1561 if (ltdb->cache->indexlist->num_elements == 0) {
1562 return LDB_SUCCESS;
1565 ctx.module = module;
1566 ctx.error = 0;
1568 /* now traverse adding any indexes for normal LDB records */
1569 ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1570 if (ret == -1) {
1571 struct ldb_context *ldb = ldb_module_get_ctx(module);
1572 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
1573 return LDB_ERR_OPERATIONS_ERROR;
1576 if (ctx.error != LDB_SUCCESS) {
1577 struct ldb_context *ldb = ldb_module_get_ctx(module);
1578 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1579 return ctx.error;
1582 return LDB_SUCCESS;