s4-ldb: fixed re-index during a complex transaction
[Samba/cd1.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
blob9faba397d02e6538123e0ebc8d4a4d7bd9587a2f
1 /*
2 ldb database library
4 Copyright (C) Andrew Tridgell 2004-2009
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 * Name: ldb
27 * Component: ldb tdb backend - indexing
29 * Description: indexing routines for ldb tdb backend
31 * Author: Andrew Tridgell
34 #include "ldb_tdb.h"
36 struct dn_list {
37 unsigned int count;
38 struct ldb_val *dn;
41 struct ltdb_idxptr {
42 struct tdb_context *itdb;
43 int error;
46 /* we put a @IDXVERSION attribute on index entries. This
47 allows us to tell if it was written by an older version
49 #define LTDB_INDEXING_VERSION 2
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
54 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55 ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56 return LDB_SUCCESS;
59 /* compare two DN entries in a dn_list. Take account of possible
60 * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 int ret = strncmp((char *)v1->data, (char *)v2->data, v1->length);
64 if (ret != 0) return ret;
65 if (v2->length > v1->length && v2->data[v1->length] != 0) {
66 return 1;
68 return 0;
73 find a entry in a dn_list, using a ldb_val. Uses a case sensitive
74 comparison with the dn returns -1 if not found
76 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 int i;
79 for (i=0; i<list->count; i++) {
80 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82 return -1;
86 find a entry in a dn_list. Uses a case sensitive comparison with the dn
87 returns -1 if not found
89 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 struct ldb_val v;
92 v.data = discard_const_p(unsigned char, dn);
93 v.length = strlen(dn);
94 return ltdb_dn_list_find_val(list, &v);
97 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
99 struct dn_list *list;
100 if (rec.dsize != sizeof(void *)) {
101 ldb_asprintf_errstring(ldb_module_get_ctx(module),
102 "Bad data size for idxptr %u", (unsigned)rec.dsize);
103 return NULL;
106 list = talloc_get_type(*(struct dn_list **)rec.dptr, struct dn_list);
107 if (list == NULL) {
108 ldb_asprintf_errstring(ldb_module_get_ctx(module),
109 "Bad type '%s' for idxptr",
110 talloc_get_name(*(struct dn_list **)rec.dptr));
111 return NULL;
113 if (check_parent && list->dn && talloc_parent(list->dn) != list) {
114 ldb_asprintf_errstring(ldb_module_get_ctx(module),
115 "Bad parent '%s' for idxptr",
116 talloc_get_name(talloc_parent(list->dn)));
117 return NULL;
119 return list;
123 return the @IDX list in an index entry for a dn as a
124 struct dn_list
126 static int ltdb_dn_list_load(struct ldb_module *module,
127 struct ldb_dn *dn, struct dn_list *list)
129 struct ldb_message *msg;
130 int ret;
131 struct ldb_message_element *el;
132 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
133 TDB_DATA rec;
134 struct dn_list *list2;
135 TDB_DATA key;
137 list->dn = NULL;
138 list->count = 0;
140 /* see if we have any in-memory index entries */
141 if (ltdb->idxptr == NULL ||
142 ltdb->idxptr->itdb == NULL) {
143 goto normal_index;
146 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
147 key.dsize = strlen((char *)key.dptr);
149 rec = tdb_fetch(ltdb->idxptr->itdb, key);
150 if (rec.dptr == NULL) {
151 goto normal_index;
154 /* we've found an in-memory index entry */
155 list2 = ltdb_index_idxptr(module, rec, true);
156 if (list2 == NULL) {
157 free(rec.dptr);
158 return LDB_ERR_OPERATIONS_ERROR;
160 free(rec.dptr);
162 *list = *list2;
163 return LDB_SUCCESS;
165 normal_index:
166 msg = ldb_msg_new(list);
167 if (msg == NULL) {
168 return LDB_ERR_OPERATIONS_ERROR;
171 ret = ltdb_search_dn1(module, dn, msg);
172 if (ret != LDB_SUCCESS) {
173 return ret;
176 /* TODO: check indexing version number */
178 el = ldb_msg_find_element(msg, LTDB_IDX);
179 if (!el) {
180 talloc_free(msg);
181 return LDB_SUCCESS;
184 /* we avoid copying the strings by stealing the list */
185 list->dn = talloc_steal(list, el->values);
186 list->count = el->num_values;
188 return LDB_SUCCESS;
193 save a dn_list into a full @IDX style record
195 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
196 struct dn_list *list)
198 struct ldb_message *msg;
199 int ret;
201 if (list->count == 0) {
202 ret = ltdb_delete_noindex(module, dn);
203 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
204 return LDB_SUCCESS;
206 return ret;
209 msg = ldb_msg_new(module);
210 if (!msg) {
211 ldb_module_oom(module);
212 return LDB_ERR_OPERATIONS_ERROR;
215 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
216 if (ret != LDB_SUCCESS) {
217 talloc_free(msg);
218 ldb_module_oom(module);
219 return LDB_ERR_OPERATIONS_ERROR;
222 msg->dn = dn;
223 if (list->count > 0) {
224 struct ldb_message_element *el;
226 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
227 if (ret != LDB_SUCCESS) {
228 ldb_module_oom(module);
229 talloc_free(msg);
230 return LDB_ERR_OPERATIONS_ERROR;
232 el->values = list->dn;
233 el->num_values = list->count;
236 ret = ltdb_store(module, msg, TDB_REPLACE);
237 talloc_free(msg);
238 return ret;
242 save a dn_list into the database, in either @IDX or internal format
244 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
245 struct dn_list *list)
247 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
248 TDB_DATA rec, key;
249 int ret;
250 struct dn_list *list2;
252 if (ltdb->idxptr == NULL) {
253 return ltdb_dn_list_store_full(module, dn, list);
256 if (ltdb->idxptr->itdb == NULL) {
257 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
258 if (ltdb->idxptr->itdb == NULL) {
259 return LDB_ERR_OPERATIONS_ERROR;
263 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
264 key.dsize = strlen((char *)key.dptr);
266 rec = tdb_fetch(ltdb->idxptr->itdb, key);
267 if (rec.dptr != NULL) {
268 list2 = ltdb_index_idxptr(module, rec, false);
269 if (list2 == NULL) {
270 free(rec.dptr);
271 return LDB_ERR_OPERATIONS_ERROR;
273 free(rec.dptr);
274 list2->dn = talloc_steal(list2, list->dn);
275 list2->count = list->count;
276 return LDB_SUCCESS;
279 list2 = talloc(ltdb->idxptr, struct dn_list);
280 if (list2 == NULL) {
281 return LDB_ERR_OPERATIONS_ERROR;
283 list2->dn = talloc_steal(list2, list->dn);
284 list2->count = list->count;
286 rec.dptr = (uint8_t *)&list2;
287 rec.dsize = sizeof(void *);
289 ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
290 if (ret == -1) {
291 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
293 return LDB_SUCCESS;
297 traverse function for storing the in-memory index entries on disk
299 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
301 struct ldb_module *module = state;
302 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
303 struct ldb_dn *dn;
304 struct ldb_context *ldb = ldb_module_get_ctx(module);
305 struct ldb_val v;
306 struct dn_list *list;
308 list = ltdb_index_idxptr(module, data, true);
309 if (list == NULL) {
310 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
311 return -1;
314 v.data = key.dptr;
315 v.length = key.dsize;
317 dn = ldb_dn_from_ldb_val(module, ldb, &v);
318 if (dn == NULL) {
319 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
320 return -1;
323 ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
324 talloc_free(dn);
325 if (ltdb->idxptr->error != 0) {
326 return -1;
328 return 0;
331 /* cleanup the idxptr mode when transaction commits */
332 int ltdb_index_transaction_commit(struct ldb_module *module)
334 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
335 int ret;
337 if (ltdb->idxptr->itdb) {
338 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
339 tdb_close(ltdb->idxptr->itdb);
342 ret = ltdb->idxptr->error;
344 if (ret != LDB_SUCCESS) {
345 struct ldb_context *ldb = ldb_module_get_ctx(module);
346 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit");
349 talloc_free(ltdb->idxptr);
350 ltdb->idxptr = NULL;
351 return ret;
354 /* cleanup the idxptr mode when transaction cancels */
355 int ltdb_index_transaction_cancel(struct ldb_module *module)
357 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
358 if (ltdb->idxptr && ltdb->idxptr->itdb) {
359 tdb_close(ltdb->idxptr->itdb);
361 talloc_free(ltdb->idxptr);
362 ltdb->idxptr = NULL;
363 return LDB_SUCCESS;
368 return the dn key to be used for an index
369 the caller is responsible for freeing
371 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
372 const char *attr, const struct ldb_val *value,
373 const struct ldb_schema_attribute **ap)
375 struct ldb_dn *ret;
376 struct ldb_val v;
377 const struct ldb_schema_attribute *a;
378 char *attr_folded;
379 int r;
381 attr_folded = ldb_attr_casefold(ldb, attr);
382 if (!attr_folded) {
383 return NULL;
386 a = ldb_schema_attribute_by_name(ldb, attr);
387 if (ap) {
388 *ap = a;
390 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
391 if (r != LDB_SUCCESS) {
392 const char *errstr = ldb_errstring(ldb);
393 /* canonicalisation can be refused. For example,
394 a attribute that takes wildcards will refuse to canonicalise
395 if the value contains a wildcard */
396 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
397 attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
398 talloc_free(attr_folded);
399 return NULL;
401 if (ldb_should_b64_encode(ldb, &v)) {
402 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
403 if (!vstr) return NULL;
404 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
405 talloc_free(vstr);
406 } else {
407 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
410 if (v.data != value->data) {
411 talloc_free(v.data);
413 talloc_free(attr_folded);
415 return ret;
419 see if a attribute value is in the list of indexed attributes
421 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
423 unsigned int i;
424 struct ldb_message_element *el;
426 el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
427 if (el == NULL) {
428 return false;
431 /* TODO: this is too expensive! At least use a binary search */
432 for (i=0; i<el->num_values; i++) {
433 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
434 return true;
437 return false;
441 in the following logic functions, the return value is treated as
442 follows:
444 LDB_SUCCESS: we found some matching index values
446 LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
448 LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
449 we'll need a full search
453 return a list of dn's that might match a simple indexed search (an
454 equality search only)
456 static int ltdb_index_dn_simple(struct ldb_module *module,
457 const struct ldb_parse_tree *tree,
458 const struct ldb_message *index_list,
459 struct dn_list *list)
461 struct ldb_context *ldb;
462 struct ldb_dn *dn;
463 int ret;
465 ldb = ldb_module_get_ctx(module);
467 list->count = 0;
468 list->dn = NULL;
470 /* if the attribute isn't in the list of indexed attributes then
471 this node needs a full search */
472 if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
473 return LDB_ERR_OPERATIONS_ERROR;
476 /* the attribute is indexed. Pull the list of DNs that match the
477 search criterion */
478 dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
479 if (!dn) return LDB_ERR_OPERATIONS_ERROR;
481 ret = ltdb_dn_list_load(module, dn, list);
482 talloc_free(dn);
483 return ret;
487 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
490 return a list of dn's that might match a leaf indexed search
492 static int ltdb_index_dn_leaf(struct ldb_module *module,
493 const struct ldb_parse_tree *tree,
494 const struct ldb_message *index_list,
495 struct dn_list *list)
497 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
498 list->dn = talloc_array(list, struct ldb_val, 1);
499 if (list->dn == NULL) {
500 ldb_module_oom(module);
501 return LDB_ERR_OPERATIONS_ERROR;
503 list->dn[0] = tree->u.equality.value;
504 list->count = 1;
505 return LDB_SUCCESS;
507 return ltdb_index_dn_simple(module, tree, index_list, list);
512 list intersection
513 list = list & list2
515 static bool list_intersect(struct ldb_context *ldb,
516 struct dn_list *list, const struct dn_list *list2)
518 struct dn_list *list3;
519 unsigned int i;
521 if (list->count == 0) {
522 /* 0 & X == 0 */
523 return true;
525 if (list2->count == 0) {
526 /* X & 0 == 0 */
527 list->count = 0;
528 list->dn = NULL;
529 return true;
532 /* the indexing code is allowed to return a longer list than
533 what really matches, as all results are filtered by the
534 full expression at the end - this shortcut avoids a lot of
535 work in some cases */
536 if (list->count < 2 && list2->count > 10) {
537 return true;
539 if (list2->count < 2 && list->count > 10) {
540 list->count = list2->count;
541 list->dn = list2->dn;
542 /* note that list2 may not be the parent of list2->dn,
543 as list2->dn may be owned by ltdb->idxptr. In that
544 case we expect this reparent call to fail, which is
545 OK */
546 talloc_reparent(list2, list, list2->dn);
547 return true;
550 list3 = talloc_zero(list, struct dn_list);
551 if (list3 == NULL) {
552 return false;
555 list3->dn = talloc_array(list3, struct ldb_val, list->count);
556 if (!list3->dn) {
557 talloc_free(list3);
558 return false;
560 list3->count = 0;
562 for (i=0;i<list->count;i++) {
563 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
564 list3->dn[list3->count] = list->dn[i];
565 list3->count++;
569 list->dn = talloc_steal(list, list3->dn);
570 list->count = list3->count;
571 talloc_free(list3);
573 return true;
578 list union
579 list = list | list2
581 static bool list_union(struct ldb_context *ldb,
582 struct dn_list *list, const struct dn_list *list2)
584 struct ldb_val *dn3;
586 if (list2->count == 0) {
587 /* X | 0 == X */
588 return true;
591 if (list->count == 0) {
592 /* 0 | X == X */
593 list->count = list2->count;
594 list->dn = list2->dn;
595 /* note that list2 may not be the parent of list2->dn,
596 as list2->dn may be owned by ltdb->idxptr. In that
597 case we expect this reparent call to fail, which is
598 OK */
599 talloc_reparent(list2, list, list2->dn);
600 return true;
603 dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
604 if (!dn3) {
605 ldb_oom(ldb);
606 return false;
609 /* we allow for duplicates here, and get rid of them later */
610 memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
611 memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
613 list->dn = dn3;
614 list->count += list2->count;
616 return true;
619 static int ltdb_index_dn(struct ldb_module *module,
620 const struct ldb_parse_tree *tree,
621 const struct ldb_message *index_list,
622 struct dn_list *list);
626 process an OR list (a union)
628 static int ltdb_index_dn_or(struct ldb_module *module,
629 const struct ldb_parse_tree *tree,
630 const struct ldb_message *index_list,
631 struct dn_list *list)
633 struct ldb_context *ldb;
634 unsigned int i;
636 ldb = ldb_module_get_ctx(module);
638 list->dn = NULL;
639 list->count = 0;
641 for (i=0; i<tree->u.list.num_elements; i++) {
642 struct dn_list *list2;
643 int ret;
645 list2 = talloc_zero(list, struct dn_list);
646 if (list2 == NULL) {
647 return LDB_ERR_OPERATIONS_ERROR;
650 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
652 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
653 /* X || 0 == X */
654 talloc_free(list2);
655 continue;
658 if (ret != LDB_SUCCESS) {
659 /* X || * == * */
660 talloc_free(list2);
661 return ret;
664 if (!list_union(ldb, list, list2)) {
665 talloc_free(list2);
666 return LDB_ERR_OPERATIONS_ERROR;
670 if (list->count == 0) {
671 return LDB_ERR_NO_SUCH_OBJECT;
674 return LDB_SUCCESS;
679 NOT an index results
681 static int ltdb_index_dn_not(struct ldb_module *module,
682 const struct ldb_parse_tree *tree,
683 const struct ldb_message *index_list,
684 struct dn_list *list)
686 /* the only way to do an indexed not would be if we could
687 negate the not via another not or if we knew the total
688 number of database elements so we could know that the
689 existing expression covered the whole database.
691 instead, we just give up, and rely on a full index scan
692 (unless an outer & manages to reduce the list)
694 return LDB_ERR_OPERATIONS_ERROR;
698 static bool ltdb_index_unique(struct ldb_context *ldb,
699 const char *attr)
701 const struct ldb_schema_attribute *a;
702 a = ldb_schema_attribute_by_name(ldb, attr);
703 if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
704 return true;
706 return false;
710 process an AND expression (intersection)
712 static int ltdb_index_dn_and(struct ldb_module *module,
713 const struct ldb_parse_tree *tree,
714 const struct ldb_message *index_list,
715 struct dn_list *list)
717 struct ldb_context *ldb;
718 unsigned int i;
719 bool found;
721 ldb = ldb_module_get_ctx(module);
723 list->dn = NULL;
724 list->count = 0;
726 /* in the first pass we only look for unique simple
727 equality tests, in the hope of avoiding having to look
728 at any others */
729 for (i=0; i<tree->u.list.num_elements; i++) {
730 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
731 int ret;
733 if (subtree->operation != LDB_OP_EQUALITY ||
734 !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
735 continue;
738 ret = ltdb_index_dn(module, subtree, index_list, list);
739 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
740 /* 0 && X == 0 */
741 return LDB_ERR_NO_SUCH_OBJECT;
743 if (ret == LDB_SUCCESS) {
744 /* a unique index match means we can
745 * stop. Note that we don't care if we return
746 * a few too many objects, due to later
747 * filtering */
748 return LDB_SUCCESS;
752 /* now do a full intersection */
753 found = false;
755 for (i=0; i<tree->u.list.num_elements; i++) {
756 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
757 struct dn_list *list2;
758 int ret;
760 list2 = talloc_zero(list, struct dn_list);
761 if (list2 == NULL) {
762 ldb_module_oom(module);
763 return LDB_ERR_OPERATIONS_ERROR;
766 ret = ltdb_index_dn(module, subtree, index_list, list2);
768 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
769 /* X && 0 == 0 */
770 list->dn = NULL;
771 list->count = 0;
772 talloc_free(list2);
773 return LDB_ERR_NO_SUCH_OBJECT;
776 if (ret != LDB_SUCCESS) {
777 /* this didn't adding anything */
778 talloc_free(list2);
779 continue;
782 if (!found) {
783 talloc_reparent(list2, list, list->dn);
784 list->dn = list2->dn;
785 list->count = list2->count;
786 found = true;
787 } else if (!list_intersect(ldb, list, list2)) {
788 talloc_free(list2);
789 return LDB_ERR_OPERATIONS_ERROR;
792 if (list->count == 0) {
793 list->dn = NULL;
794 return LDB_ERR_NO_SUCH_OBJECT;
797 if (list->count < 2) {
798 /* it isn't worth loading the next part of the tree */
799 return LDB_SUCCESS;
803 if (!found) {
804 /* none of the attributes were indexed */
805 return LDB_ERR_OPERATIONS_ERROR;
808 return LDB_SUCCESS;
812 return a list of matching objects using a one-level index
814 static int ltdb_index_dn_one(struct ldb_module *module,
815 struct ldb_dn *parent_dn,
816 struct dn_list *list)
818 struct ldb_context *ldb;
819 struct ldb_dn *key;
820 struct ldb_val val;
821 int ret;
823 ldb = ldb_module_get_ctx(module);
825 /* work out the index key from the parent DN */
826 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
827 val.length = strlen((char *)val.data);
828 key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
829 if (!key) {
830 ldb_oom(ldb);
831 return LDB_ERR_OPERATIONS_ERROR;
834 ret = ltdb_dn_list_load(module, key, list);
835 talloc_free(key);
836 if (ret != LDB_SUCCESS) {
837 return ret;
840 if (list->count == 0) {
841 return LDB_ERR_NO_SUCH_OBJECT;
844 return LDB_SUCCESS;
848 return a list of dn's that might match a indexed search or
849 an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
851 static int ltdb_index_dn(struct ldb_module *module,
852 const struct ldb_parse_tree *tree,
853 const struct ldb_message *index_list,
854 struct dn_list *list)
856 int ret = LDB_ERR_OPERATIONS_ERROR;
858 switch (tree->operation) {
859 case LDB_OP_AND:
860 ret = ltdb_index_dn_and(module, tree, index_list, list);
861 break;
863 case LDB_OP_OR:
864 ret = ltdb_index_dn_or(module, tree, index_list, list);
865 break;
867 case LDB_OP_NOT:
868 ret = ltdb_index_dn_not(module, tree, index_list, list);
869 break;
871 case LDB_OP_EQUALITY:
872 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
873 break;
875 case LDB_OP_SUBSTRING:
876 case LDB_OP_GREATER:
877 case LDB_OP_LESS:
878 case LDB_OP_PRESENT:
879 case LDB_OP_APPROX:
880 case LDB_OP_EXTENDED:
881 /* we can't index with fancy bitops yet */
882 ret = LDB_ERR_OPERATIONS_ERROR;
883 break;
886 return ret;
890 filter a candidate dn_list from an indexed search into a set of results
891 extracting just the given attributes
893 static int ltdb_index_filter(const struct dn_list *dn_list,
894 struct ltdb_context *ac,
895 uint32_t *match_count)
897 struct ldb_context *ldb;
898 struct ldb_message *msg;
899 unsigned int i;
901 ldb = ldb_module_get_ctx(ac->module);
903 for (i = 0; i < dn_list->count; i++) {
904 struct ldb_dn *dn;
905 int ret;
907 msg = ldb_msg_new(ac);
908 if (!msg) {
909 return LDB_ERR_OPERATIONS_ERROR;
912 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
913 if (dn == NULL) {
914 talloc_free(msg);
915 return LDB_ERR_OPERATIONS_ERROR;
918 ret = ltdb_search_dn1(ac->module, dn, msg);
919 talloc_free(dn);
920 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
921 /* the record has disappeared? yes, this can happen */
922 talloc_free(msg);
923 continue;
926 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
927 /* an internal error */
928 talloc_free(msg);
929 return LDB_ERR_OPERATIONS_ERROR;
932 if (!ldb_match_msg(ldb, msg,
933 ac->tree, ac->base, ac->scope)) {
934 talloc_free(msg);
935 continue;
938 /* filter the attributes that the user wants */
939 ret = ltdb_filter_attrs(msg, ac->attrs);
941 if (ret == -1) {
942 talloc_free(msg);
943 return LDB_ERR_OPERATIONS_ERROR;
946 ret = ldb_module_send_entry(ac->req, msg, NULL);
947 if (ret != LDB_SUCCESS) {
948 ac->request_terminated = true;
949 return ret;
952 (*match_count)++;
955 return LDB_SUCCESS;
959 remove any duplicated entries in a indexed result
961 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
963 int i, new_count;
965 if (list->count < 2) {
966 return;
969 qsort(list->dn, list->count, sizeof(struct ldb_val), (comparison_fn_t) dn_list_cmp);
971 new_count = 1;
972 for (i=1; i<list->count; i++) {
973 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
974 if (new_count != i) {
975 list->dn[new_count] = list->dn[i];
977 new_count++;
981 list->count = new_count;
985 search the database with a LDAP-like expression using indexes
986 returns -1 if an indexed search is not possible, in which
987 case the caller should call ltdb_search_full()
989 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
991 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
992 struct dn_list *dn_list;
993 int ret;
995 /* see if indexing is enabled */
996 if (!ltdb->cache->attribute_indexes &&
997 !ltdb->cache->one_level_indexes &&
998 ac->scope != LDB_SCOPE_BASE) {
999 /* fallback to a full search */
1000 return LDB_ERR_OPERATIONS_ERROR;
1003 dn_list = talloc_zero(ac, struct dn_list);
1004 if (dn_list == NULL) {
1005 ldb_module_oom(ac->module);
1006 return LDB_ERR_OPERATIONS_ERROR;
1009 switch (ac->scope) {
1010 case LDB_SCOPE_BASE:
1011 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1012 if (dn_list->dn == NULL) {
1013 ldb_module_oom(ac->module);
1014 talloc_free(dn_list);
1015 return LDB_ERR_OPERATIONS_ERROR;
1017 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1018 if (dn_list->dn[0].data == NULL) {
1019 ldb_module_oom(ac->module);
1020 talloc_free(dn_list);
1021 return LDB_ERR_OPERATIONS_ERROR;
1023 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1024 dn_list->count = 1;
1025 break;
1027 case LDB_SCOPE_ONELEVEL:
1028 if (!ltdb->cache->one_level_indexes) {
1029 talloc_free(dn_list);
1030 return LDB_ERR_OPERATIONS_ERROR;
1032 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1033 if (ret != LDB_SUCCESS) {
1034 talloc_free(dn_list);
1035 return ret;
1037 break;
1039 case LDB_SCOPE_SUBTREE:
1040 case LDB_SCOPE_DEFAULT:
1041 if (!ltdb->cache->attribute_indexes) {
1042 talloc_free(dn_list);
1043 return LDB_ERR_OPERATIONS_ERROR;
1045 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1046 if (ret != LDB_SUCCESS) {
1047 talloc_free(dn_list);
1048 return ret;
1050 ltdb_dn_list_remove_duplicates(dn_list);
1051 break;
1054 ret = ltdb_index_filter(dn_list, ac, match_count);
1055 talloc_free(dn_list);
1056 return ret;
1060 add an index entry for one message element
1062 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1063 struct ldb_message_element *el, int v_idx)
1065 struct ldb_context *ldb;
1066 struct ldb_dn *dn_key;
1067 int ret;
1068 const struct ldb_schema_attribute *a;
1069 struct dn_list *list;
1070 unsigned alloc_len;
1072 ldb = ldb_module_get_ctx(module);
1074 list = talloc_zero(module, struct dn_list);
1075 if (list == NULL) {
1076 return LDB_ERR_OPERATIONS_ERROR;
1079 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1080 if (!dn_key) {
1081 talloc_free(list);
1082 return LDB_ERR_OPERATIONS_ERROR;
1084 talloc_steal(list, dn_key);
1086 ret = ltdb_dn_list_load(module, dn_key, list);
1087 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1088 talloc_free(list);
1089 return ret;
1092 if (ltdb_dn_list_find_str(list, dn) != -1) {
1093 talloc_free(list);
1094 return LDB_SUCCESS;
1097 if (list->count > 0 &&
1098 a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1099 talloc_free(list);
1100 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1103 /* overallocate the list a bit, to reduce the number of
1104 * realloc trigered copies */
1105 alloc_len = ((list->count+1)+7) & ~7;
1106 list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1107 if (list->dn == NULL) {
1108 talloc_free(list);
1109 return LDB_ERR_OPERATIONS_ERROR;
1111 list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1112 list->dn[list->count].length = strlen(dn);
1113 list->count++;
1115 ret = ltdb_dn_list_store(module, dn_key, list);
1117 talloc_free(list);
1119 return ret;
1123 add index entries for one elements in a message
1125 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1126 struct ldb_message_element *el)
1128 unsigned int i;
1129 for (i = 0; i < el->num_values; i++) {
1130 int ret = ltdb_index_add1(module, dn, el, i);
1131 if (ret != LDB_SUCCESS) {
1132 return ret;
1136 return LDB_SUCCESS;
1140 add index entries for all elements in a message
1142 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1143 struct ldb_message_element *elements, int num_el)
1145 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1146 unsigned int i;
1148 if (dn[0] == '@') {
1149 return LDB_SUCCESS;
1152 if (ltdb->cache->indexlist->num_elements == 0) {
1153 /* no indexed fields */
1154 return LDB_SUCCESS;
1157 for (i = 0; i < num_el; i++) {
1158 int ret;
1159 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1160 continue;
1162 ret = ltdb_index_add_el(module, dn, &elements[i]);
1163 if (ret != LDB_SUCCESS) {
1164 return ret;
1168 return LDB_SUCCESS;
1173 insert a one level index for a message
1175 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1177 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1178 struct ldb_message_element el;
1179 struct ldb_val val;
1180 struct ldb_dn *pdn;
1181 const char *dn;
1182 int ret;
1184 /* We index for ONE Level only if requested */
1185 if (!ltdb->cache->one_level_indexes) {
1186 return LDB_SUCCESS;
1189 pdn = ldb_dn_get_parent(module, msg->dn);
1190 if (pdn == NULL) {
1191 return LDB_ERR_OPERATIONS_ERROR;
1194 dn = ldb_dn_get_linearized(msg->dn);
1195 if (dn == NULL) {
1196 talloc_free(pdn);
1197 return LDB_ERR_OPERATIONS_ERROR;
1200 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1201 if (val.data == NULL) {
1202 talloc_free(pdn);
1203 return LDB_ERR_OPERATIONS_ERROR;
1206 val.length = strlen((char *)val.data);
1207 el.name = LTDB_IDXONE;
1208 el.values = &val;
1209 el.num_values = 1;
1211 if (add) {
1212 ret = ltdb_index_add1(module, dn, &el, 0);
1213 } else { /* delete */
1214 ret = ltdb_index_del_value(module, dn, &el, 0);
1217 talloc_free(pdn);
1219 return ret;
1223 add the index entries for a new element in a record
1224 The caller guarantees that these element values are not yet indexed
1226 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn,
1227 struct ldb_message_element *el)
1229 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1230 if (ldb_dn_is_special(dn)) {
1231 return LDB_SUCCESS;
1233 if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1234 return LDB_SUCCESS;
1236 return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1240 add the index entries for a new record
1242 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1244 const char *dn;
1245 int ret;
1247 if (ldb_dn_is_special(msg->dn)) {
1248 return LDB_SUCCESS;
1251 dn = ldb_dn_get_linearized(msg->dn);
1252 if (dn == NULL) {
1253 return LDB_ERR_OPERATIONS_ERROR;
1256 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1257 if (ret != LDB_SUCCESS) {
1258 return ret;
1261 return ltdb_index_onelevel(module, msg, 1);
1266 delete an index entry for one message element
1268 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1269 struct ldb_message_element *el, int v_idx)
1271 struct ldb_context *ldb;
1272 struct ldb_dn *dn_key;
1273 int ret, i;
1274 struct dn_list *list;
1276 ldb = ldb_module_get_ctx(module);
1278 if (dn[0] == '@') {
1279 return LDB_SUCCESS;
1282 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1283 if (!dn_key) {
1284 return LDB_ERR_OPERATIONS_ERROR;
1287 list = talloc_zero(dn_key, struct dn_list);
1288 if (list == NULL) {
1289 talloc_free(dn_key);
1290 return LDB_ERR_OPERATIONS_ERROR;
1293 ret = ltdb_dn_list_load(module, dn_key, list);
1294 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1295 /* it wasn't indexed. Did we have an earlier error? If we did then
1296 its gone now */
1297 talloc_free(dn_key);
1298 return LDB_SUCCESS;
1301 if (ret != LDB_SUCCESS) {
1302 talloc_free(dn_key);
1303 return ret;
1306 i = ltdb_dn_list_find_str(list, dn);
1307 if (i == -1) {
1308 /* nothing to delete */
1309 talloc_free(dn_key);
1310 return LDB_SUCCESS;
1313 if (i != list->count-1) {
1314 memmove(&list->dn[i], &list->dn[i+1], sizeof(list->dn[0])*(list->count - (i+1)));
1316 list->count--;
1317 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1319 ret = ltdb_dn_list_store(module, dn_key, list);
1321 talloc_free(dn_key);
1323 return ret;
1327 delete the index entries for a element
1328 return -1 on failure
1330 int ltdb_index_del_element(struct ldb_module *module, const char *dn, struct ldb_message_element *el)
1332 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1333 int ret;
1334 unsigned int i;
1336 if (!ltdb->cache->attribute_indexes) {
1337 /* no indexed fields */
1338 return LDB_SUCCESS;
1341 if (dn[0] == '@') {
1342 return LDB_SUCCESS;
1345 if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1346 return LDB_SUCCESS;
1348 for (i = 0; i < el->num_values; i++) {
1349 ret = ltdb_index_del_value(module, dn, el, i);
1350 if (ret != LDB_SUCCESS) {
1351 return ret;
1355 return LDB_SUCCESS;
1359 delete the index entries for a record
1360 return -1 on failure
1362 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1364 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1365 int ret;
1366 const char *dn;
1367 unsigned int i;
1369 if (ldb_dn_is_special(msg->dn)) {
1370 return LDB_SUCCESS;
1373 ret = ltdb_index_onelevel(module, msg, 0);
1374 if (ret != LDB_SUCCESS) {
1375 return ret;
1378 if (!ltdb->cache->attribute_indexes) {
1379 /* no indexed fields */
1380 return LDB_SUCCESS;
1383 dn = ldb_dn_get_linearized(msg->dn);
1384 if (dn == NULL) {
1385 return LDB_ERR_OPERATIONS_ERROR;
1388 for (i = 0; i < msg->num_elements; i++) {
1389 ret = ltdb_index_del_element(module, dn, &msg->elements[i]);
1390 if (ret != LDB_SUCCESS) {
1391 return ret;
1395 return LDB_SUCCESS;
1400 traversal function that deletes all @INDEX records
1402 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1404 struct ldb_module *module = state;
1405 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1406 const char *dnstr = "DN=" LTDB_INDEX ":";
1407 struct dn_list list;
1408 struct ldb_dn *dn;
1409 struct ldb_val v;
1410 int ret;
1412 if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1413 return 0;
1415 /* we need to put a empty list in the internal tdb for this
1416 * index entry */
1417 list.dn = NULL;
1418 list.count = 0;
1419 v.data = key.dptr;
1420 v.length = strlen((char *)key.dptr);
1422 dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1423 ret = ltdb_dn_list_store(module, dn, &list);
1424 if (ret != LDB_SUCCESS) {
1425 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1426 "Unable to store null index for %s\n",
1427 ldb_dn_get_linearized(dn));
1428 talloc_free(dn);
1429 return -1;
1431 talloc_free(dn);
1432 return 0;
1436 traversal function that adds @INDEX records during a re index
1438 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1440 struct ldb_context *ldb;
1441 struct ldb_module *module = (struct ldb_module *)state;
1442 struct ldb_message *msg;
1443 const char *dn = NULL;
1444 int ret;
1445 TDB_DATA key2;
1447 ldb = ldb_module_get_ctx(module);
1449 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1450 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1451 return 0;
1454 msg = ldb_msg_new(module);
1455 if (msg == NULL) {
1456 return -1;
1459 ret = ltdb_unpack_data(module, &data, msg);
1460 if (ret != 0) {
1461 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1462 ldb_dn_get_linearized(msg->dn));
1463 talloc_free(msg);
1464 return -1;
1467 /* check if the DN key has changed, perhaps due to the
1468 case insensitivity of an element changing */
1469 key2 = ltdb_key(module, msg->dn);
1470 if (key2.dptr == NULL) {
1471 /* probably a corrupt record ... darn */
1472 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1473 ldb_dn_get_linearized(msg->dn));
1474 talloc_free(msg);
1475 return 0;
1477 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1478 tdb_delete(tdb, key);
1479 tdb_store(tdb, key2, data, 0);
1481 talloc_free(key2.dptr);
1483 if (msg->dn == NULL) {
1484 dn = (char *)key.dptr + 3;
1485 } else {
1486 dn = ldb_dn_get_linearized(msg->dn);
1489 ret = ltdb_index_onelevel(module, msg, 1);
1490 if (ret != LDB_SUCCESS) {
1491 ldb_debug(ldb, LDB_DEBUG_ERROR,
1492 "Adding special ONE LEVEL index failed (%s)!",
1493 ldb_dn_get_linearized(msg->dn));
1494 talloc_free(msg);
1495 return -1;
1498 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1500 talloc_free(msg);
1502 if (ret != LDB_SUCCESS) return -1;
1504 return 0;
1508 force a complete reindex of the database
1510 int ltdb_reindex(struct ldb_module *module)
1512 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1513 int ret;
1515 if (ltdb_cache_reload(module) != 0) {
1516 return LDB_ERR_OPERATIONS_ERROR;
1519 /* first traverse the database deleting any @INDEX records by
1520 * putting NULL entries in the in-memory tdb
1522 ret = tdb_traverse(ltdb->tdb, delete_index, module);
1523 if (ret == -1) {
1524 return LDB_ERR_OPERATIONS_ERROR;
1527 /* if we don't have indexes we have nothing todo */
1528 if (ltdb->cache->indexlist->num_elements == 0) {
1529 return LDB_SUCCESS;
1532 /* now traverse adding any indexes for normal LDB records */
1533 ret = tdb_traverse(ltdb->tdb, re_index, module);
1534 if (ret == -1) {
1535 return LDB_ERR_OPERATIONS_ERROR;
1538 return LDB_SUCCESS;