s4-ldb: when taking a list intersection, the result can be as long as the first list
[Samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
blob709a2e1626df067e7b154c31e1d0e60ea0fdd554
1 /*
2 ldb database library
4 Copyright (C) Andrew Tridgell 2004-2009
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 * Name: ldb
27 * Component: ldb tdb backend - indexing
29 * Description: indexing routines for ldb tdb backend
31 * Author: Andrew Tridgell
34 #include "ldb_tdb.h"
36 struct dn_list {
37 unsigned int count;
38 struct ldb_val *dn;
41 struct ltdb_idxptr {
42 struct tdb_context *itdb;
43 bool repack;
44 int error;
47 /* we put a @IDXVERSION attribute on index entries. This
48 allows us to tell if it was written by an older version
50 #define LTDB_INDEXING_VERSION 2
52 /* enable the idxptr mode when transactions start */
53 int ltdb_index_transaction_start(struct ldb_module *module)
55 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
56 ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
57 return LDB_SUCCESS;
60 /* compare two DN entries in a dn_list. Take account of possible
61 * differences in string termination */
62 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
64 int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
65 if (ret != 0) return ret;
66 if (v2->length > v1->length && v2->data[v1->length] != 0) {
67 return 1;
69 return 0;
74 find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75 comparison with the dn returns -1 if not found
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
79 int i;
80 for (i=0; i<list->count; i++) {
81 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
83 return -1;
87 find a entry in a dn_list. Uses a case sensitive comparison with the dn
88 returns -1 if not found
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
92 struct ldb_val v;
93 v.data = discard_const_p(unsigned char, dn);
94 v.length = strlen(dn);
95 return ltdb_dn_list_find_val(list, &v);
99 return the @IDX list in an index entry for a dn as a
100 struct dn_list
102 static int ltdb_dn_list_load(struct ldb_module *module,
103 struct ldb_dn *dn, struct dn_list *list)
105 struct ldb_message *msg;
106 int ret;
107 struct ldb_message_element *el;
108 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
109 TDB_DATA rec;
110 struct dn_list *list2;
111 TDB_DATA key;
113 list->dn = NULL;
114 list->count = 0;
116 /* see if we have any in-memory index entries */
117 if (ltdb->idxptr == NULL ||
118 ltdb->idxptr->itdb == NULL) {
119 goto normal_index;
122 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
123 key.dsize = strlen((char *)key.dptr);
125 rec = tdb_fetch(ltdb->idxptr->itdb, key);
126 if (rec.dptr == NULL) {
127 goto normal_index;
130 /* we've found an in-memory index entry */
131 if (rec.dsize != sizeof(void *)) {
132 free(rec.dptr);
133 ldb_asprintf_errstring(ldb_module_get_ctx(module),
134 "Bad internal index size %u", (unsigned)rec.dsize);
135 return LDB_ERR_OPERATIONS_ERROR;
137 list2 = *(struct dn_list **)rec.dptr;
138 free(rec.dptr);
140 *list = *list2;
141 return LDB_SUCCESS;
143 normal_index:
144 msg = ldb_msg_new(list);
145 if (msg == NULL) {
146 return LDB_ERR_OPERATIONS_ERROR;
149 ret = ltdb_search_dn1(module, dn, msg);
150 if (ret != LDB_SUCCESS) {
151 return ret;
154 /* TODO: check indexing version number */
156 el = ldb_msg_find_element(msg, LTDB_IDX);
157 if (!el) {
158 talloc_free(msg);
159 return LDB_SUCCESS;
162 /* we avoid copying the strings by stealing the list */
163 list->dn = talloc_steal(list, el->values);
164 list->count = el->num_values;
166 return LDB_SUCCESS;
171 save a dn_list into a full @IDX style record
173 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
174 struct dn_list *list)
176 struct ldb_message *msg;
177 int ret;
179 msg = ldb_msg_new(module);
180 if (!msg) {
181 ldb_module_oom(module);
182 return LDB_ERR_OPERATIONS_ERROR;
185 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
186 if (ret != LDB_SUCCESS) {
187 ldb_module_oom(module);
188 return LDB_ERR_OPERATIONS_ERROR;
191 msg->dn = dn;
192 if (list->count > 0) {
193 struct ldb_message_element *el;
195 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
196 if (ret != LDB_SUCCESS) {
197 ldb_module_oom(module);
198 talloc_free(msg);
199 return LDB_ERR_OPERATIONS_ERROR;
201 el->values = list->dn;
202 el->num_values = list->count;
205 ret = ltdb_store(module, msg, TDB_REPLACE);
206 talloc_free(msg);
207 return ret;
211 save a dn_list into the database, in either @IDX or internal format
213 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
214 struct dn_list *list)
216 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
217 TDB_DATA rec, key;
218 int ret;
219 struct dn_list *list2;
221 if (ltdb->idxptr == NULL) {
222 return ltdb_dn_list_store_full(module, dn, list);
225 if (ltdb->idxptr->itdb == NULL) {
226 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
227 if (ltdb->idxptr->itdb == NULL) {
228 return LDB_ERR_OPERATIONS_ERROR;
232 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
233 key.dsize = strlen((char *)key.dptr);
235 rec = tdb_fetch(ltdb->idxptr->itdb, key);
236 if (rec.dptr != NULL) {
237 if (rec.dsize != sizeof(void *)) {
238 free(rec.dptr);
239 ldb_asprintf_errstring(ldb_module_get_ctx(module),
240 "Bad internal index size %u", (unsigned)rec.dsize);
241 return LDB_ERR_OPERATIONS_ERROR;
243 list2 = *(struct dn_list **)rec.dptr;
244 free(rec.dptr);
245 list2->dn = talloc_steal(list2, list->dn);
246 list2->count = list->count;
247 return LDB_SUCCESS;
250 list2 = talloc(ltdb->idxptr, struct dn_list);
251 if (list2 == NULL) {
252 return LDB_ERR_OPERATIONS_ERROR;
254 list2->dn = talloc_steal(list2, list->dn);
255 list2->count = list->count;
257 rec.dptr = (uint8_t *)&list2;
258 rec.dsize = sizeof(void *);
260 ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
262 return ret;
266 traverse function for storing the in-memory index entries on disk
268 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
270 struct ldb_module *module = state;
271 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
272 struct ldb_dn *dn;
273 struct ldb_context *ldb = ldb_module_get_ctx(module);
274 struct ldb_val v;
275 struct dn_list *list;
277 if (data.dsize != sizeof(void *)) {
278 ldb_asprintf_errstring(ldb, "Bad internal index size %u", (unsigned)data.dsize);
279 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
280 return -1;
283 list = *(struct dn_list **)data.dptr;
285 v.data = key.dptr;
286 v.length = key.dsize;
288 dn = ldb_dn_from_ldb_val(module, ldb, &v);
289 if (dn == NULL) {
290 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
291 return -1;
294 ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
295 talloc_free(dn);
296 return ltdb->idxptr->error;
299 /* cleanup the idxptr mode when transaction commits */
300 int ltdb_index_transaction_commit(struct ldb_module *module)
302 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
303 int ret;
305 if (ltdb->idxptr->itdb) {
306 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
307 tdb_close(ltdb->idxptr->itdb);
310 ret = ltdb->idxptr->error;
312 if (ret != LDB_SUCCESS) {
313 struct ldb_context *ldb = ldb_module_get_ctx(module);
314 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
317 talloc_free(ltdb->idxptr);
318 ltdb->idxptr = NULL;
319 return ret;
322 /* cleanup the idxptr mode when transaction cancels */
323 int ltdb_index_transaction_cancel(struct ldb_module *module)
325 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
326 if (ltdb->idxptr && ltdb->idxptr->itdb) {
327 tdb_close(ltdb->idxptr->itdb);
329 talloc_free(ltdb->idxptr);
330 ltdb->idxptr = NULL;
331 return LDB_SUCCESS;
336 return the dn key to be used for an index
337 the caller is responsible for freeing
339 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
340 const char *attr, const struct ldb_val *value,
341 const struct ldb_schema_attribute **ap)
343 struct ldb_dn *ret;
344 struct ldb_val v;
345 const struct ldb_schema_attribute *a;
346 char *attr_folded;
347 int r;
349 attr_folded = ldb_attr_casefold(ldb, attr);
350 if (!attr_folded) {
351 return NULL;
354 a = ldb_schema_attribute_by_name(ldb, attr);
355 if (ap) {
356 *ap = a;
358 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
359 if (r != LDB_SUCCESS) {
360 const char *errstr = ldb_errstring(ldb);
361 /* canonicalisation can be refused. For example,
362 a attribute that takes wildcards will refuse to canonicalise
363 if the value contains a wildcard */
364 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
365 attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
366 talloc_free(attr_folded);
367 return NULL;
369 if (ldb_should_b64_encode(ldb, &v)) {
370 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
371 if (!vstr) return NULL;
372 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
373 talloc_free(vstr);
374 } else {
375 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
378 if (v.data != value->data) {
379 talloc_free(v.data);
381 talloc_free(attr_folded);
383 return ret;
387 see if a attribute value is in the list of indexed attributes
389 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
391 unsigned int i;
392 struct ldb_message_element *el;
394 el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
395 if (el == NULL) {
396 return false;
398 for (i=0; i<el->num_values; i++) {
399 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
400 return true;
403 return false;
407 in the following logic functions, the return value is treated as
408 follows:
410 LDB_SUCCESS: we found some matching index values
412 LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
414 LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
415 we'll need a full search
419 return a list of dn's that might match a simple indexed search (an
420 equality search only)
422 static int ltdb_index_dn_simple(struct ldb_module *module,
423 const struct ldb_parse_tree *tree,
424 const struct ldb_message *index_list,
425 struct dn_list *list)
427 struct ldb_context *ldb;
428 struct ldb_dn *dn;
429 int ret;
431 ldb = ldb_module_get_ctx(module);
433 list->count = 0;
434 list->dn = NULL;
436 /* if the attribute isn't in the list of indexed attributes then
437 this node needs a full search */
438 if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
439 return LDB_ERR_OPERATIONS_ERROR;
442 /* the attribute is indexed. Pull the list of DNs that match the
443 search criterion */
444 dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
445 if (!dn) return LDB_ERR_OPERATIONS_ERROR;
447 ret = ltdb_dn_list_load(module, dn, list);
448 talloc_free(dn);
449 return ret;
453 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
456 return a list of dn's that might match a leaf indexed search
458 static int ltdb_index_dn_leaf(struct ldb_module *module,
459 const struct ldb_parse_tree *tree,
460 const struct ldb_message *index_list,
461 struct dn_list *list)
463 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
464 list->dn = talloc_array(list, struct ldb_val, 1);
465 if (list->dn == NULL) {
466 ldb_module_oom(module);
467 return LDB_ERR_OPERATIONS_ERROR;
469 list->dn[0] = tree->u.equality.value;
470 list->count = 1;
471 return LDB_SUCCESS;
473 return ltdb_index_dn_simple(module, tree, index_list, list);
478 list intersection
479 list = list & list2
481 static bool list_intersect(struct ldb_context *ldb,
482 struct dn_list *list, const struct dn_list *list2)
484 struct dn_list *list3;
485 unsigned int i;
487 if (list->count == 0) {
488 /* 0 & X == 0 */
489 return true;
491 if (list2->count == 0) {
492 /* X & 0 == 0 */
493 list->count = 0;
494 list->dn = NULL;
495 return true;
498 /* the indexing code is allowed to return a longer list than
499 what really matches, as all results are filtered by the
500 full expression at the end - this shortcut avoids a lot of
501 work in some cases */
502 if (list->count < 2 && list2->count > 10) {
503 return true;
505 if (list2->count < 2 && list->count > 10) {
506 list->count = list2->count;
507 list->dn = list2->dn;
508 /* note that list2 may not be the parent of list2->dn,
509 as list2->dn may be owned by ltdb->idxptr. In that
510 case we expect this reparent call to fail, which is
511 OK */
512 talloc_reparent(list2, list, list2->dn);
513 return true;
516 list3 = talloc_zero(list, struct dn_list);
517 if (list3 == NULL) {
518 return false;
521 list3->dn = talloc_array(list3, struct ldb_val, list->count);
522 if (!list3->dn) {
523 talloc_free(list3);
524 return false;
526 list3->count = 0;
528 for (i=0;i<list->count;i++) {
529 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
530 list3->dn[list3->count] = list->dn[i];
531 list3->count++;
535 list->dn = talloc_steal(list, list3->dn);
536 list->count = list3->count;
537 talloc_free(list3);
539 return true;
544 list union
545 list = list | list2
547 static bool list_union(struct ldb_context *ldb,
548 struct dn_list *list, const struct dn_list *list2)
550 struct ldb_val *dn3;
552 if (list2->count == 0) {
553 /* X | 0 == X */
554 return true;
557 if (list->count == 0) {
558 /* 0 | X == X */
559 list->count = list2->count;
560 list->dn = list2->dn;
561 /* note that list2 may not be the parent of list2->dn,
562 as list2->dn may be owned by ltdb->idxptr. In that
563 case we expect this reparent call to fail, which is
564 OK */
565 talloc_reparent(list2, list, list2->dn);
566 return true;
569 dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
570 if (!dn3) {
571 ldb_oom(ldb);
572 return false;
575 /* we allow for duplicates here, and get rid of them later */
576 memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
577 memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
579 list->dn = dn3;
580 list->count += list2->count;
582 return true;
585 static int ltdb_index_dn(struct ldb_module *module,
586 const struct ldb_parse_tree *tree,
587 const struct ldb_message *index_list,
588 struct dn_list *list);
592 process an OR list (a union)
594 static int ltdb_index_dn_or(struct ldb_module *module,
595 const struct ldb_parse_tree *tree,
596 const struct ldb_message *index_list,
597 struct dn_list *list)
599 struct ldb_context *ldb;
600 unsigned int i;
602 ldb = ldb_module_get_ctx(module);
604 list->dn = NULL;
605 list->count = 0;
607 for (i=0; i<tree->u.list.num_elements; i++) {
608 struct dn_list *list2;
609 int ret;
611 list2 = talloc_zero(list, struct dn_list);
612 if (list2 == NULL) {
613 return LDB_ERR_OPERATIONS_ERROR;
616 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
618 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
619 /* X || 0 == X */
620 talloc_free(list2);
621 continue;
624 if (ret != LDB_SUCCESS) {
625 /* X || * == * */
626 talloc_free(list2);
627 return ret;
630 if (!list_union(ldb, list, list2)) {
631 talloc_free(list2);
632 return LDB_ERR_OPERATIONS_ERROR;
636 if (list->count == 0) {
637 return LDB_ERR_NO_SUCH_OBJECT;
640 return LDB_SUCCESS;
645 NOT an index results
647 static int ltdb_index_dn_not(struct ldb_module *module,
648 const struct ldb_parse_tree *tree,
649 const struct ldb_message *index_list,
650 struct dn_list *list)
652 /* the only way to do an indexed not would be if we could
653 negate the not via another not or if we knew the total
654 number of database elements so we could know that the
655 existing expression covered the whole database.
657 instead, we just give up, and rely on a full index scan
658 (unless an outer & manages to reduce the list)
660 return LDB_ERR_OPERATIONS_ERROR;
664 static bool ltdb_index_unique(struct ldb_context *ldb,
665 const char *attr)
667 const struct ldb_schema_attribute *a;
668 a = ldb_schema_attribute_by_name(ldb, attr);
669 if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
670 return true;
672 return false;
676 process an AND expression (intersection)
678 static int ltdb_index_dn_and(struct ldb_module *module,
679 const struct ldb_parse_tree *tree,
680 const struct ldb_message *index_list,
681 struct dn_list *list)
683 struct ldb_context *ldb;
684 unsigned int i;
685 bool found;
687 ldb = ldb_module_get_ctx(module);
689 list->dn = NULL;
690 list->count = 0;
692 /* in the first pass we only look for unique simple
693 equality tests, in the hope of avoiding having to look
694 at any others */
695 for (i=0; i<tree->u.list.num_elements; i++) {
696 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
697 int ret;
699 if (subtree->operation != LDB_OP_EQUALITY ||
700 !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
701 continue;
704 ret = ltdb_index_dn(module, subtree, index_list, list);
705 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
706 /* 0 && X == 0 */
707 return LDB_ERR_NO_SUCH_OBJECT;
709 if (ret == LDB_SUCCESS) {
710 /* a unique index match means we can
711 * stop. Note that we don't care if we return
712 * a few too many objects, due to later
713 * filtering */
714 return LDB_SUCCESS;
718 /* now do a full intersection */
719 found = false;
721 for (i=0; i<tree->u.list.num_elements; i++) {
722 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
723 struct dn_list *list2;
724 int ret;
726 list2 = talloc_zero(list, struct dn_list);
727 if (list2 == NULL) {
728 ldb_module_oom(module);
729 return LDB_ERR_OPERATIONS_ERROR;
732 ret = ltdb_index_dn(module, subtree, index_list, list2);
734 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
735 /* X && 0 == 0 */
736 list->dn = NULL;
737 list->count = 0;
738 talloc_free(list2);
739 return LDB_ERR_NO_SUCH_OBJECT;
742 if (ret != LDB_SUCCESS) {
743 /* this didn't adding anything */
744 talloc_free(list2);
745 continue;
748 if (!found) {
749 talloc_reparent(list2, list, list->dn);
750 list->dn = list2->dn;
751 list->count = list2->count;
752 found = true;
753 } else if (!list_intersect(ldb, list, list2)) {
754 talloc_free(list2);
755 return LDB_ERR_OPERATIONS_ERROR;
758 if (list->count == 0) {
759 list->dn = NULL;
760 return LDB_ERR_NO_SUCH_OBJECT;
763 if (list->count < 2) {
764 /* it isn't worth loading the next part of the tree */
765 return LDB_SUCCESS;
769 if (!found) {
770 /* none of the attributes were indexed */
771 return LDB_ERR_OPERATIONS_ERROR;
774 return LDB_SUCCESS;
778 return a list of matching objects using a one-level index
780 static int ltdb_index_dn_one(struct ldb_module *module,
781 struct ldb_dn *parent_dn,
782 struct dn_list *list)
784 struct ldb_context *ldb;
785 struct ldb_dn *key;
786 struct ldb_val val;
787 int ret;
789 ldb = ldb_module_get_ctx(module);
791 /* work out the index key from the parent DN */
792 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
793 val.length = strlen((char *)val.data);
794 key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
795 if (!key) {
796 ldb_oom(ldb);
797 return LDB_ERR_OPERATIONS_ERROR;
800 ret = ltdb_dn_list_load(module, key, list);
801 talloc_free(key);
802 if (ret != LDB_SUCCESS) {
803 return ret;
806 if (list->count == 0) {
807 return LDB_ERR_NO_SUCH_OBJECT;
810 return LDB_SUCCESS;
814 return a list of dn's that might match a indexed search or
815 an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
817 static int ltdb_index_dn(struct ldb_module *module,
818 const struct ldb_parse_tree *tree,
819 const struct ldb_message *index_list,
820 struct dn_list *list)
822 int ret = LDB_ERR_OPERATIONS_ERROR;
824 switch (tree->operation) {
825 case LDB_OP_AND:
826 ret = ltdb_index_dn_and(module, tree, index_list, list);
827 break;
829 case LDB_OP_OR:
830 ret = ltdb_index_dn_or(module, tree, index_list, list);
831 break;
833 case LDB_OP_NOT:
834 ret = ltdb_index_dn_not(module, tree, index_list, list);
835 break;
837 case LDB_OP_EQUALITY:
838 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
839 break;
841 case LDB_OP_SUBSTRING:
842 case LDB_OP_GREATER:
843 case LDB_OP_LESS:
844 case LDB_OP_PRESENT:
845 case LDB_OP_APPROX:
846 case LDB_OP_EXTENDED:
847 /* we can't index with fancy bitops yet */
848 ret = LDB_ERR_OPERATIONS_ERROR;
849 break;
852 return ret;
856 filter a candidate dn_list from an indexed search into a set of results
857 extracting just the given attributes
859 static int ltdb_index_filter(const struct dn_list *dn_list,
860 struct ltdb_context *ac,
861 uint32_t *match_count)
863 struct ldb_context *ldb;
864 struct ldb_message *msg;
865 unsigned int i;
867 ldb = ldb_module_get_ctx(ac->module);
869 for (i = 0; i < dn_list->count; i++) {
870 struct ldb_dn *dn;
871 int ret;
873 msg = ldb_msg_new(ac);
874 if (!msg) {
875 return LDB_ERR_OPERATIONS_ERROR;
878 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
879 if (dn == NULL) {
880 talloc_free(msg);
881 return LDB_ERR_OPERATIONS_ERROR;
884 ret = ltdb_search_dn1(ac->module, dn, msg);
885 talloc_free(dn);
886 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
887 /* the record has disappeared? yes, this can happen */
888 talloc_free(msg);
889 continue;
892 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
893 /* an internal error */
894 talloc_free(msg);
895 return LDB_ERR_OPERATIONS_ERROR;
898 if (!ldb_match_msg(ldb, msg,
899 ac->tree, ac->base, ac->scope)) {
900 talloc_free(msg);
901 continue;
904 /* filter the attributes that the user wants */
905 ret = ltdb_filter_attrs(msg, ac->attrs);
907 if (ret == -1) {
908 talloc_free(msg);
909 return LDB_ERR_OPERATIONS_ERROR;
912 ret = ldb_module_send_entry(ac->req, msg, NULL);
913 if (ret != LDB_SUCCESS) {
914 ac->request_terminated = true;
915 return ret;
918 (*match_count)++;
921 return LDB_SUCCESS;
925 remove any duplicated entries in a indexed result
927 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
929 int i, new_count;
931 if (list->count < 2) {
932 return;
935 qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
937 new_count = 1;
938 for (i=1; i<list->count; i++) {
939 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
940 if (new_count != i) {
941 list->dn[new_count] = list->dn[i];
943 new_count++;
947 list->count = new_count;
951 search the database with a LDAP-like expression using indexes
952 returns -1 if an indexed search is not possible, in which
953 case the caller should call ltdb_search_full()
955 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
957 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
958 struct dn_list *dn_list;
959 int ret;
961 /* see if indexing is enabled */
962 if (!ltdb->cache->attribute_indexes &&
963 !ltdb->cache->one_level_indexes &&
964 ac->scope != LDB_SCOPE_BASE) {
965 /* fallback to a full search */
966 return LDB_ERR_OPERATIONS_ERROR;
969 dn_list = talloc_zero(ac, struct dn_list);
970 if (dn_list == NULL) {
971 ldb_module_oom(ac->module);
972 return LDB_ERR_OPERATIONS_ERROR;
975 switch (ac->scope) {
976 case LDB_SCOPE_BASE:
977 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
978 if (dn_list->dn == NULL) {
979 ldb_module_oom(ac->module);
980 talloc_free(dn_list);
981 return LDB_ERR_OPERATIONS_ERROR;
983 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
984 if (dn_list->dn[0].data == NULL) {
985 ldb_module_oom(ac->module);
986 talloc_free(dn_list);
987 return LDB_ERR_OPERATIONS_ERROR;
989 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
990 dn_list->count = 1;
991 break;
993 case LDB_SCOPE_ONELEVEL:
994 if (!ltdb->cache->one_level_indexes) {
995 talloc_free(dn_list);
996 return LDB_ERR_OPERATIONS_ERROR;
998 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
999 if (ret != LDB_SUCCESS) {
1000 talloc_free(dn_list);
1001 return ret;
1003 break;
1005 case LDB_SCOPE_SUBTREE:
1006 case LDB_SCOPE_DEFAULT:
1007 if (!ltdb->cache->attribute_indexes) {
1008 talloc_free(dn_list);
1009 return LDB_ERR_OPERATIONS_ERROR;
1011 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1012 if (ret != LDB_SUCCESS) {
1013 talloc_free(dn_list);
1014 return ret;
1016 ltdb_dn_list_remove_duplicates(dn_list);
1017 break;
1020 ret = ltdb_index_filter(dn_list, ac, match_count);
1021 talloc_free(dn_list);
1022 return ret;
1026 add an index entry for one message element
1028 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1029 struct ldb_message_element *el, int v_idx)
1031 struct ldb_context *ldb;
1032 struct ldb_dn *dn_key;
1033 int ret;
1034 const struct ldb_schema_attribute *a;
1035 struct dn_list *list;
1037 ldb = ldb_module_get_ctx(module);
1039 list = talloc_zero(module, struct dn_list);
1040 if (list == NULL) {
1041 return LDB_ERR_OPERATIONS_ERROR;
1044 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1045 if (!dn_key) {
1046 talloc_free(list);
1047 return LDB_ERR_OPERATIONS_ERROR;
1049 talloc_steal(list, dn_key);
1051 ret = ltdb_dn_list_load(module, dn_key, list);
1052 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1053 talloc_free(list);
1054 return ret;
1057 if (ltdb_dn_list_find_str(list, dn) != -1) {
1058 talloc_free(list);
1059 return LDB_SUCCESS;
1062 if (list->count > 0 &&
1063 a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1064 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1067 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count+1);
1068 if (list->dn == NULL) {
1069 talloc_free(list);
1070 return LDB_ERR_OPERATIONS_ERROR;
1072 list->dn[list->count].data = discard_const_p(unsigned char, dn);
1073 list->dn[list->count].length = strlen(dn);
1074 list->count++;
1076 ret = ltdb_dn_list_store(module, dn_key, list);
1078 talloc_free(list);
1080 return ret;
1084 add index entries for one elements in a message
1086 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1087 struct ldb_message_element *el)
1089 unsigned int i;
1090 for (i = 0; i < el->num_values; i++) {
1091 int ret = ltdb_index_add1(module, dn, el, i);
1092 if (ret != LDB_SUCCESS) {
1093 return ret;
1097 return LDB_SUCCESS;
1101 add index entries for all elements in a message
1103 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1104 struct ldb_message_element *elements, int num_el)
1106 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1107 unsigned int i;
1109 if (dn[0] == '@') {
1110 return LDB_SUCCESS;
1113 if (ltdb->cache->indexlist->num_elements == 0) {
1114 /* no indexed fields */
1115 return LDB_SUCCESS;
1118 for (i = 0; i < num_el; i++) {
1119 int ret;
1120 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1121 continue;
1123 ret = ltdb_index_add_el(module, dn, &elements[i]);
1124 if (ret != LDB_SUCCESS) {
1125 return ret;
1129 return LDB_SUCCESS;
1134 insert a one level index for a message
1136 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1138 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1139 struct ldb_message_element el;
1140 struct ldb_val val;
1141 struct ldb_dn *pdn;
1142 const char *dn;
1143 int ret;
1145 /* We index for ONE Level only if requested */
1146 if (!ltdb->cache->one_level_indexes) {
1147 return LDB_SUCCESS;
1150 pdn = ldb_dn_get_parent(module, msg->dn);
1151 if (pdn == NULL) {
1152 return LDB_ERR_OPERATIONS_ERROR;
1155 dn = ldb_dn_get_linearized(msg->dn);
1156 if (dn == NULL) {
1157 talloc_free(pdn);
1158 return LDB_ERR_OPERATIONS_ERROR;
1161 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1162 if (val.data == NULL) {
1163 talloc_free(pdn);
1164 return LDB_ERR_OPERATIONS_ERROR;
1167 val.length = strlen((char *)val.data);
1168 el.name = LTDB_IDXONE;
1169 el.values = &val;
1170 el.num_values = 1;
1172 if (add) {
1173 ret = ltdb_index_add1(module, dn, &el, 0);
1174 } else { /* delete */
1175 ret = ltdb_index_del_value(module, dn, &el, 0);
1178 talloc_free(pdn);
1180 return ret;
1184 add the index entries for a new element in a record
1185 The caller guarantees that these element values are not yet indexed
1187 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn,
1188 struct ldb_message_element *el)
1190 if (ldb_dn_is_special(dn)) {
1191 return LDB_SUCCESS;
1193 return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1197 add the index entries for a new record
1199 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1201 const char *dn;
1202 int ret;
1204 if (ldb_dn_is_special(msg->dn)) {
1205 return LDB_SUCCESS;
1208 dn = ldb_dn_get_linearized(msg->dn);
1209 if (dn == NULL) {
1210 return LDB_ERR_OPERATIONS_ERROR;
1213 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1214 if (ret != LDB_SUCCESS) {
1215 return ret;
1218 return ltdb_index_onelevel(module, msg, 1);
1223 delete an index entry for one message element
1225 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1226 struct ldb_message_element *el, int v_idx)
1228 struct ldb_context *ldb;
1229 struct ldb_dn *dn_key;
1230 int ret, i;
1231 struct dn_list *list;
1233 ldb = ldb_module_get_ctx(module);
1235 if (dn[0] == '@') {
1236 return LDB_SUCCESS;
1239 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1240 if (!dn_key) {
1241 return LDB_ERR_OPERATIONS_ERROR;
1244 list = talloc_zero(dn_key, struct dn_list);
1245 if (list == NULL) {
1246 talloc_free(dn_key);
1247 return LDB_ERR_OPERATIONS_ERROR;
1250 ret = ltdb_dn_list_load(module, dn_key, list);
1251 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1252 /* it wasn't indexed. Did we have an earlier error? If we did then
1253 its gone now */
1254 talloc_free(dn_key);
1255 return LDB_SUCCESS;
1258 if (ret != LDB_SUCCESS) {
1259 talloc_free(dn_key);
1260 return ret;
1263 i = ltdb_dn_list_find_str(list, dn);
1264 if (i == -1) {
1265 /* nothing to delete */
1266 talloc_free(dn_key);
1267 return LDB_SUCCESS;
1270 if (i != list->count-1) {
1271 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1273 list->count--;
1274 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1276 ret = ltdb_dn_list_store(module, dn_key, list);
1278 talloc_free(dn_key);
1280 return ret;
1284 delete the index entries for a element
1285 return -1 on failure
1287 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1289 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1290 int ret;
1291 unsigned int i;
1293 if (!ltdb->cache->attribute_indexes) {
1294 /* no indexed fields */
1295 return LDB_SUCCESS;
1298 if (dn[0] == '@') {
1299 return LDB_SUCCESS;
1302 if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1303 return LDB_SUCCESS;
1305 for (i = 0; i < el->num_values; i++) {
1306 ret = ltdb_index_del_value(module, dn, el, i);
1307 if (ret != LDB_SUCCESS) {
1308 return ret;
1312 return LDB_SUCCESS;
1316 delete the index entries for a record
1317 return -1 on failure
1319 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1321 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1322 int ret;
1323 const char *dn;
1324 unsigned int i;
1326 if (ldb_dn_is_special(msg->dn)) {
1327 return LDB_SUCCESS;
1330 ret = ltdb_index_onelevel(module, msg, 0);
1331 if (ret != LDB_SUCCESS) {
1332 return ret;
1335 if (!ltdb->cache->attribute_indexes) {
1336 /* no indexed fields */
1337 return LDB_SUCCESS;
1340 dn = ldb_dn_get_linearized(msg->dn);
1341 if (dn == NULL) {
1342 return LDB_ERR_OPERATIONS_ERROR;
1345 for (i = 0; i < msg->num_elements; i++) {
1346 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1347 if (ret != LDB_SUCCESS) {
1348 return ret;
1352 return LDB_SUCCESS;
1357 traversal function that deletes all @INDEX records
1359 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1361 const char *dn = "DN=" LTDB_INDEX ":";
1362 if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1363 return tdb_delete(tdb, key);
1365 return 0;
1369 traversal function that adds @INDEX records during a re index
1371 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1373 struct ldb_context *ldb;
1374 struct ldb_module *module = (struct ldb_module *)state;
1375 struct ldb_message *msg;
1376 const char *dn = NULL;
1377 int ret;
1378 TDB_DATA key2;
1380 ldb = ldb_module_get_ctx(module);
1382 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1383 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1384 return 0;
1387 msg = talloc(module, struct ldb_message);
1388 if (msg == NULL) {
1389 return -1;
1392 ret = ltdb_unpack_data(module, &data, msg);
1393 if (ret != 0) {
1394 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1395 ldb_dn_get_linearized(msg->dn));
1396 talloc_free(msg);
1397 return -1;
1400 /* check if the DN key has changed, perhaps due to the
1401 case insensitivity of an element changing */
1402 key2 = ltdb_key(module, msg->dn);
1403 if (key2.dptr == NULL) {
1404 /* probably a corrupt record ... darn */
1405 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1406 ldb_dn_get_linearized(msg->dn));
1407 talloc_free(msg);
1408 return 0;
1410 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1411 tdb_delete(tdb, key);
1412 tdb_store(tdb, key2, data, 0);
1414 talloc_free(key2.dptr);
1416 if (msg->dn == NULL) {
1417 dn = (char *)key.dptr + 3;
1418 } else {
1419 dn = ldb_dn_get_linearized(msg->dn);
1422 ret = ltdb_index_onelevel(module, msg, 1);
1423 if (ret != LDB_SUCCESS) {
1424 ldb_debug(ldb, LDB_DEBUG_ERROR,
1425 "Adding special ONE LEVEL index failed (%s)!",
1426 ldb_dn_get_linearized(msg->dn));
1427 talloc_free(msg);
1428 return -1;
1431 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1433 talloc_free(msg);
1435 if (ret != LDB_SUCCESS) return -1;
1437 return 0;
1441 force a complete reindex of the database
1443 int ltdb_reindex(struct ldb_module *module)
1445 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1446 int ret;
1448 if (ltdb_cache_reload(module) != 0) {
1449 return LDB_ERR_OPERATIONS_ERROR;
1452 /* first traverse the database deleting any @INDEX records */
1453 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1454 if (ret == -1) {
1455 return LDB_ERR_OPERATIONS_ERROR;
1458 /* if we don't have indexes we have nothing todo */
1459 if (ltdb->cache->indexlist->num_elements == 0) {
1460 return LDB_SUCCESS;
1463 /* now traverse adding any indexes for normal LDB records */
1464 ret = tdb_traverse(ltdb->tdb, re_index, module);
1465 if (ret == -1) {
1466 return LDB_ERR_OPERATIONS_ERROR;
1469 if (ltdb->idxptr) {
1470 ltdb->idxptr->repack = true;
1473 return LDB_SUCCESS;