s4 dns: Add test to prove two updates in one packet are a FORMERR
[Samba/vl.git] / lib / ldb / ldb_tdb / ldb_index.c
blob24cc93feb94f486e5f5b6033f80c07676a1beef6
1 /*
2 ldb database library
4 Copyright (C) Andrew Tridgell 2004-2009
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 * Name: ldb
27 * Component: ldb tdb backend - indexing
29 * Description: indexing routines for ldb tdb backend
31 * Author: Andrew Tridgell
34 #include "ldb_tdb.h"
36 struct dn_list {
37 unsigned int count;
38 struct ldb_val *dn;
41 struct ltdb_idxptr {
42 struct tdb_context *itdb;
43 int error;
46 /* we put a @IDXVERSION attribute on index entries. This
47 allows us to tell if it was written by an older version
49 #define LTDB_INDEXING_VERSION 2
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
54 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55 ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56 return LDB_SUCCESS;
59 /* compare two DN entries in a dn_list. Take account of possible
60 * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
63 if (v1->length > v2->length && v1->data[v2->length] != 0) {
64 return -1;
66 if (v1->length < v2->length && v2->data[v1->length] != 0) {
67 return 1;
69 return strncmp((char *)v1->data, (char *)v2->data, v1->length);
74 find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75 comparison with the dn returns -1 if not found
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
79 unsigned int i;
80 for (i=0; i<list->count; i++) {
81 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
83 return -1;
87 find a entry in a dn_list. Uses a case sensitive comparison with the dn
88 returns -1 if not found
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
92 struct ldb_val v;
93 v.data = discard_const_p(unsigned char, dn);
94 v.length = strlen(dn);
95 return ltdb_dn_list_find_val(list, &v);
99 this is effectively a cast function, but with lots of paranoia
100 checks and also copes with CPUs that are fussy about pointer
101 alignment
103 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
105 struct dn_list *list;
106 if (rec.dsize != sizeof(void *)) {
107 ldb_asprintf_errstring(ldb_module_get_ctx(module),
108 "Bad data size for idxptr %u", (unsigned)rec.dsize);
109 return NULL;
111 /* note that we can't just use a cast here, as rec.dptr may
112 not be aligned sufficiently for a pointer. A cast would cause
113 platforms like some ARM CPUs to crash */
114 memcpy(&list, rec.dptr, sizeof(void *));
115 list = talloc_get_type(list, struct dn_list);
116 if (list == NULL) {
117 ldb_asprintf_errstring(ldb_module_get_ctx(module),
118 "Bad type '%s' for idxptr",
119 talloc_get_name(list));
120 return NULL;
122 if (check_parent && list->dn && talloc_parent(list->dn) != list) {
123 ldb_asprintf_errstring(ldb_module_get_ctx(module),
124 "Bad parent '%s' for idxptr",
125 talloc_get_name(talloc_parent(list->dn)));
126 return NULL;
128 return list;
132 return the @IDX list in an index entry for a dn as a
133 struct dn_list
135 static int ltdb_dn_list_load(struct ldb_module *module,
136 struct ldb_dn *dn, struct dn_list *list)
138 struct ldb_message *msg;
139 int ret;
140 struct ldb_message_element *el;
141 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
142 TDB_DATA rec;
143 struct dn_list *list2;
144 TDB_DATA key;
146 list->dn = NULL;
147 list->count = 0;
149 /* see if we have any in-memory index entries */
150 if (ltdb->idxptr == NULL ||
151 ltdb->idxptr->itdb == NULL) {
152 goto normal_index;
155 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
156 key.dsize = strlen((char *)key.dptr);
158 rec = tdb_fetch_compat(ltdb->idxptr->itdb, key);
159 if (rec.dptr == NULL) {
160 goto normal_index;
163 /* we've found an in-memory index entry */
164 list2 = ltdb_index_idxptr(module, rec, true);
165 if (list2 == NULL) {
166 free(rec.dptr);
167 return LDB_ERR_OPERATIONS_ERROR;
169 free(rec.dptr);
171 *list = *list2;
172 return LDB_SUCCESS;
174 normal_index:
175 msg = ldb_msg_new(list);
176 if (msg == NULL) {
177 return LDB_ERR_OPERATIONS_ERROR;
180 ret = ltdb_search_dn1(module, dn, msg);
181 if (ret != LDB_SUCCESS) {
182 talloc_free(msg);
183 return ret;
186 /* TODO: check indexing version number */
188 el = ldb_msg_find_element(msg, LTDB_IDX);
189 if (!el) {
190 talloc_free(msg);
191 return LDB_SUCCESS;
194 /* we avoid copying the strings by stealing the list */
195 list->dn = talloc_steal(list, el->values);
196 list->count = el->num_values;
198 return LDB_SUCCESS;
203 save a dn_list into a full @IDX style record
205 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn,
206 struct dn_list *list)
208 struct ldb_message *msg;
209 int ret;
211 if (list->count == 0) {
212 ret = ltdb_delete_noindex(module, dn);
213 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
214 return LDB_SUCCESS;
216 return ret;
219 msg = ldb_msg_new(module);
220 if (!msg) {
221 return ldb_module_oom(module);
224 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
225 if (ret != LDB_SUCCESS) {
226 talloc_free(msg);
227 return ldb_module_oom(module);
230 msg->dn = dn;
231 if (list->count > 0) {
232 struct ldb_message_element *el;
234 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
235 if (ret != LDB_SUCCESS) {
236 talloc_free(msg);
237 return ldb_module_oom(module);
239 el->values = list->dn;
240 el->num_values = list->count;
243 ret = ltdb_store(module, msg, TDB_REPLACE);
244 talloc_free(msg);
245 return ret;
249 save a dn_list into the database, in either @IDX or internal format
251 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
252 struct dn_list *list)
254 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
255 TDB_DATA rec, key;
256 int ret;
257 struct dn_list *list2;
259 if (ltdb->idxptr == NULL) {
260 return ltdb_dn_list_store_full(module, dn, list);
263 if (ltdb->idxptr->itdb == NULL) {
264 ltdb->idxptr->itdb = tdb_open_compat(NULL, 1000, TDB_INTERNAL, O_RDWR, 0, NULL, NULL);
265 if (ltdb->idxptr->itdb == NULL) {
266 return LDB_ERR_OPERATIONS_ERROR;
270 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
271 key.dsize = strlen((char *)key.dptr);
273 rec = tdb_fetch_compat(ltdb->idxptr->itdb, key);
274 if (rec.dptr != NULL) {
275 list2 = ltdb_index_idxptr(module, rec, false);
276 if (list2 == NULL) {
277 free(rec.dptr);
278 return LDB_ERR_OPERATIONS_ERROR;
280 free(rec.dptr);
281 list2->dn = talloc_steal(list2, list->dn);
282 list2->count = list->count;
283 return LDB_SUCCESS;
286 list2 = talloc(ltdb->idxptr, struct dn_list);
287 if (list2 == NULL) {
288 return LDB_ERR_OPERATIONS_ERROR;
290 list2->dn = talloc_steal(list2, list->dn);
291 list2->count = list->count;
293 rec.dptr = (uint8_t *)&list2;
294 rec.dsize = sizeof(void *);
296 ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
297 if (ret != 0) {
298 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
300 return LDB_SUCCESS;
304 traverse function for storing the in-memory index entries on disk
306 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
308 struct ldb_module *module = state;
309 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
310 struct ldb_dn *dn;
311 struct ldb_context *ldb = ldb_module_get_ctx(module);
312 struct ldb_val v;
313 struct dn_list *list;
315 list = ltdb_index_idxptr(module, data, true);
316 if (list == NULL) {
317 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
318 return -1;
321 v.data = key.dptr;
322 v.length = strnlen((char *)key.dptr, key.dsize);
324 dn = ldb_dn_from_ldb_val(module, ldb, &v);
325 if (dn == NULL) {
326 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
327 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
328 return -1;
331 ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
332 talloc_free(dn);
333 if (ltdb->idxptr->error != 0) {
334 return -1;
336 return 0;
339 /* cleanup the idxptr mode when transaction commits */
340 int ltdb_index_transaction_commit(struct ldb_module *module)
342 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
343 int ret;
345 struct ldb_context *ldb = ldb_module_get_ctx(module);
347 ldb_reset_err_string(ldb);
349 if (ltdb->idxptr->itdb) {
350 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
351 tdb_close(ltdb->idxptr->itdb);
354 ret = ltdb->idxptr->error;
355 if (ret != LDB_SUCCESS) {
356 if (!ldb_errstring(ldb)) {
357 ldb_set_errstring(ldb, ldb_strerror(ret));
359 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
362 talloc_free(ltdb->idxptr);
363 ltdb->idxptr = NULL;
364 return ret;
367 /* cleanup the idxptr mode when transaction cancels */
368 int ltdb_index_transaction_cancel(struct ldb_module *module)
370 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
371 if (ltdb->idxptr && ltdb->idxptr->itdb) {
372 tdb_close(ltdb->idxptr->itdb);
374 talloc_free(ltdb->idxptr);
375 ltdb->idxptr = NULL;
376 return LDB_SUCCESS;
381 return the dn key to be used for an index
382 the caller is responsible for freeing
384 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
385 const char *attr, const struct ldb_val *value,
386 const struct ldb_schema_attribute **ap)
388 struct ldb_dn *ret;
389 struct ldb_val v;
390 const struct ldb_schema_attribute *a;
391 char *attr_folded;
392 int r;
394 attr_folded = ldb_attr_casefold(ldb, attr);
395 if (!attr_folded) {
396 return NULL;
399 a = ldb_schema_attribute_by_name(ldb, attr);
400 if (ap) {
401 *ap = a;
403 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
404 if (r != LDB_SUCCESS) {
405 const char *errstr = ldb_errstring(ldb);
406 /* canonicalisation can be refused. For example,
407 a attribute that takes wildcards will refuse to canonicalise
408 if the value contains a wildcard */
409 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
410 attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
411 talloc_free(attr_folded);
412 return NULL;
414 if (ldb_should_b64_encode(ldb, &v)) {
415 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
416 if (!vstr) {
417 talloc_free(attr_folded);
418 return NULL;
420 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
421 talloc_free(vstr);
422 } else {
423 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
426 if (v.data != value->data) {
427 talloc_free(v.data);
429 talloc_free(attr_folded);
431 return ret;
435 see if a attribute value is in the list of indexed attributes
437 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
439 unsigned int i;
440 struct ldb_message_element *el;
442 el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
443 if (el == NULL) {
444 return false;
447 /* TODO: this is too expensive! At least use a binary search */
448 for (i=0; i<el->num_values; i++) {
449 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
450 return true;
453 return false;
457 in the following logic functions, the return value is treated as
458 follows:
460 LDB_SUCCESS: we found some matching index values
462 LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
464 LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
465 we'll need a full search
469 return a list of dn's that might match a simple indexed search (an
470 equality search only)
472 static int ltdb_index_dn_simple(struct ldb_module *module,
473 const struct ldb_parse_tree *tree,
474 const struct ldb_message *index_list,
475 struct dn_list *list)
477 struct ldb_context *ldb;
478 struct ldb_dn *dn;
479 int ret;
481 ldb = ldb_module_get_ctx(module);
483 list->count = 0;
484 list->dn = NULL;
486 /* if the attribute isn't in the list of indexed attributes then
487 this node needs a full search */
488 if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
489 return LDB_ERR_OPERATIONS_ERROR;
492 /* the attribute is indexed. Pull the list of DNs that match the
493 search criterion */
494 dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
495 if (!dn) return LDB_ERR_OPERATIONS_ERROR;
497 ret = ltdb_dn_list_load(module, dn, list);
498 talloc_free(dn);
499 return ret;
503 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
506 return a list of dn's that might match a leaf indexed search
508 static int ltdb_index_dn_leaf(struct ldb_module *module,
509 const struct ldb_parse_tree *tree,
510 const struct ldb_message *index_list,
511 struct dn_list *list)
513 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
514 list->dn = talloc_array(list, struct ldb_val, 1);
515 if (list->dn == NULL) {
516 ldb_module_oom(module);
517 return LDB_ERR_OPERATIONS_ERROR;
519 list->dn[0] = tree->u.equality.value;
520 list->count = 1;
521 return LDB_SUCCESS;
523 return ltdb_index_dn_simple(module, tree, index_list, list);
528 list intersection
529 list = list & list2
531 static bool list_intersect(struct ldb_context *ldb,
532 struct dn_list *list, const struct dn_list *list2)
534 struct dn_list *list3;
535 unsigned int i;
537 if (list->count == 0) {
538 /* 0 & X == 0 */
539 return true;
541 if (list2->count == 0) {
542 /* X & 0 == 0 */
543 list->count = 0;
544 list->dn = NULL;
545 return true;
548 /* the indexing code is allowed to return a longer list than
549 what really matches, as all results are filtered by the
550 full expression at the end - this shortcut avoids a lot of
551 work in some cases */
552 if (list->count < 2 && list2->count > 10) {
553 return true;
555 if (list2->count < 2 && list->count > 10) {
556 list->count = list2->count;
557 list->dn = list2->dn;
558 /* note that list2 may not be the parent of list2->dn,
559 as list2->dn may be owned by ltdb->idxptr. In that
560 case we expect this reparent call to fail, which is
561 OK */
562 talloc_reparent(list2, list, list2->dn);
563 return true;
566 list3 = talloc_zero(list, struct dn_list);
567 if (list3 == NULL) {
568 return false;
571 list3->dn = talloc_array(list3, struct ldb_val, list->count);
572 if (!list3->dn) {
573 talloc_free(list3);
574 return false;
576 list3->count = 0;
578 for (i=0;i<list->count;i++) {
579 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
580 list3->dn[list3->count] = list->dn[i];
581 list3->count++;
585 list->dn = talloc_steal(list, list3->dn);
586 list->count = list3->count;
587 talloc_free(list3);
589 return true;
594 list union
595 list = list | list2
597 static bool list_union(struct ldb_context *ldb,
598 struct dn_list *list, const struct dn_list *list2)
600 struct ldb_val *dn3;
602 if (list2->count == 0) {
603 /* X | 0 == X */
604 return true;
607 if (list->count == 0) {
608 /* 0 | X == X */
609 list->count = list2->count;
610 list->dn = list2->dn;
611 /* note that list2 may not be the parent of list2->dn,
612 as list2->dn may be owned by ltdb->idxptr. In that
613 case we expect this reparent call to fail, which is
614 OK */
615 talloc_reparent(list2, list, list2->dn);
616 return true;
619 dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
620 if (!dn3) {
621 ldb_oom(ldb);
622 return false;
625 /* we allow for duplicates here, and get rid of them later */
626 memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
627 memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
629 list->dn = dn3;
630 list->count += list2->count;
632 return true;
635 static int ltdb_index_dn(struct ldb_module *module,
636 const struct ldb_parse_tree *tree,
637 const struct ldb_message *index_list,
638 struct dn_list *list);
642 process an OR list (a union)
644 static int ltdb_index_dn_or(struct ldb_module *module,
645 const struct ldb_parse_tree *tree,
646 const struct ldb_message *index_list,
647 struct dn_list *list)
649 struct ldb_context *ldb;
650 unsigned int i;
652 ldb = ldb_module_get_ctx(module);
654 list->dn = NULL;
655 list->count = 0;
657 for (i=0; i<tree->u.list.num_elements; i++) {
658 struct dn_list *list2;
659 int ret;
661 list2 = talloc_zero(list, struct dn_list);
662 if (list2 == NULL) {
663 return LDB_ERR_OPERATIONS_ERROR;
666 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
668 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
669 /* X || 0 == X */
670 talloc_free(list2);
671 continue;
674 if (ret != LDB_SUCCESS) {
675 /* X || * == * */
676 talloc_free(list2);
677 return ret;
680 if (!list_union(ldb, list, list2)) {
681 talloc_free(list2);
682 return LDB_ERR_OPERATIONS_ERROR;
686 if (list->count == 0) {
687 return LDB_ERR_NO_SUCH_OBJECT;
690 return LDB_SUCCESS;
695 NOT an index results
697 static int ltdb_index_dn_not(struct ldb_module *module,
698 const struct ldb_parse_tree *tree,
699 const struct ldb_message *index_list,
700 struct dn_list *list)
702 /* the only way to do an indexed not would be if we could
703 negate the not via another not or if we knew the total
704 number of database elements so we could know that the
705 existing expression covered the whole database.
707 instead, we just give up, and rely on a full index scan
708 (unless an outer & manages to reduce the list)
710 return LDB_ERR_OPERATIONS_ERROR;
714 static bool ltdb_index_unique(struct ldb_context *ldb,
715 const char *attr)
717 const struct ldb_schema_attribute *a;
718 a = ldb_schema_attribute_by_name(ldb, attr);
719 if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
720 return true;
722 return false;
726 process an AND expression (intersection)
728 static int ltdb_index_dn_and(struct ldb_module *module,
729 const struct ldb_parse_tree *tree,
730 const struct ldb_message *index_list,
731 struct dn_list *list)
733 struct ldb_context *ldb;
734 unsigned int i;
735 bool found;
737 ldb = ldb_module_get_ctx(module);
739 list->dn = NULL;
740 list->count = 0;
742 /* in the first pass we only look for unique simple
743 equality tests, in the hope of avoiding having to look
744 at any others */
745 for (i=0; i<tree->u.list.num_elements; i++) {
746 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
747 int ret;
749 if (subtree->operation != LDB_OP_EQUALITY ||
750 !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
751 continue;
754 ret = ltdb_index_dn(module, subtree, index_list, list);
755 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
756 /* 0 && X == 0 */
757 return LDB_ERR_NO_SUCH_OBJECT;
759 if (ret == LDB_SUCCESS) {
760 /* a unique index match means we can
761 * stop. Note that we don't care if we return
762 * a few too many objects, due to later
763 * filtering */
764 return LDB_SUCCESS;
768 /* now do a full intersection */
769 found = false;
771 for (i=0; i<tree->u.list.num_elements; i++) {
772 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
773 struct dn_list *list2;
774 int ret;
776 list2 = talloc_zero(list, struct dn_list);
777 if (list2 == NULL) {
778 return ldb_module_oom(module);
781 ret = ltdb_index_dn(module, subtree, index_list, list2);
783 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
784 /* X && 0 == 0 */
785 list->dn = NULL;
786 list->count = 0;
787 talloc_free(list2);
788 return LDB_ERR_NO_SUCH_OBJECT;
791 if (ret != LDB_SUCCESS) {
792 /* this didn't adding anything */
793 talloc_free(list2);
794 continue;
797 if (!found) {
798 talloc_reparent(list2, list, list->dn);
799 list->dn = list2->dn;
800 list->count = list2->count;
801 found = true;
802 } else if (!list_intersect(ldb, list, list2)) {
803 talloc_free(list2);
804 return LDB_ERR_OPERATIONS_ERROR;
807 if (list->count == 0) {
808 list->dn = NULL;
809 return LDB_ERR_NO_SUCH_OBJECT;
812 if (list->count < 2) {
813 /* it isn't worth loading the next part of the tree */
814 return LDB_SUCCESS;
818 if (!found) {
819 /* none of the attributes were indexed */
820 return LDB_ERR_OPERATIONS_ERROR;
823 return LDB_SUCCESS;
827 return a list of matching objects using a one-level index
829 static int ltdb_index_dn_one(struct ldb_module *module,
830 struct ldb_dn *parent_dn,
831 struct dn_list *list)
833 struct ldb_context *ldb;
834 struct ldb_dn *key;
835 struct ldb_val val;
836 int ret;
838 ldb = ldb_module_get_ctx(module);
840 /* work out the index key from the parent DN */
841 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
842 val.length = strlen((char *)val.data);
843 key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
844 if (!key) {
845 ldb_oom(ldb);
846 return LDB_ERR_OPERATIONS_ERROR;
849 ret = ltdb_dn_list_load(module, key, list);
850 talloc_free(key);
851 if (ret != LDB_SUCCESS) {
852 return ret;
855 if (list->count == 0) {
856 return LDB_ERR_NO_SUCH_OBJECT;
859 return LDB_SUCCESS;
863 return a list of dn's that might match a indexed search or
864 an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
866 static int ltdb_index_dn(struct ldb_module *module,
867 const struct ldb_parse_tree *tree,
868 const struct ldb_message *index_list,
869 struct dn_list *list)
871 int ret = LDB_ERR_OPERATIONS_ERROR;
873 switch (tree->operation) {
874 case LDB_OP_AND:
875 ret = ltdb_index_dn_and(module, tree, index_list, list);
876 break;
878 case LDB_OP_OR:
879 ret = ltdb_index_dn_or(module, tree, index_list, list);
880 break;
882 case LDB_OP_NOT:
883 ret = ltdb_index_dn_not(module, tree, index_list, list);
884 break;
886 case LDB_OP_EQUALITY:
887 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
888 break;
890 case LDB_OP_SUBSTRING:
891 case LDB_OP_GREATER:
892 case LDB_OP_LESS:
893 case LDB_OP_PRESENT:
894 case LDB_OP_APPROX:
895 case LDB_OP_EXTENDED:
896 /* we can't index with fancy bitops yet */
897 ret = LDB_ERR_OPERATIONS_ERROR;
898 break;
901 return ret;
905 filter a candidate dn_list from an indexed search into a set of results
906 extracting just the given attributes
908 static int ltdb_index_filter(const struct dn_list *dn_list,
909 struct ltdb_context *ac,
910 uint32_t *match_count)
912 struct ldb_context *ldb;
913 struct ldb_message *msg;
914 unsigned int i;
916 ldb = ldb_module_get_ctx(ac->module);
918 for (i = 0; i < dn_list->count; i++) {
919 struct ldb_dn *dn;
920 int ret;
921 bool matched;
923 msg = ldb_msg_new(ac);
924 if (!msg) {
925 return LDB_ERR_OPERATIONS_ERROR;
928 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
929 if (dn == NULL) {
930 talloc_free(msg);
931 return LDB_ERR_OPERATIONS_ERROR;
934 ret = ltdb_search_dn1(ac->module, dn, msg);
935 talloc_free(dn);
936 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
937 /* the record has disappeared? yes, this can happen */
938 talloc_free(msg);
939 continue;
942 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
943 /* an internal error */
944 talloc_free(msg);
945 return LDB_ERR_OPERATIONS_ERROR;
948 ret = ldb_match_msg_error(ldb, msg,
949 ac->tree, ac->base, ac->scope, &matched);
950 if (ret != LDB_SUCCESS) {
951 talloc_free(msg);
952 return ret;
954 if (!matched) {
955 talloc_free(msg);
956 continue;
959 /* filter the attributes that the user wants */
960 ret = ltdb_filter_attrs(msg, ac->attrs);
962 if (ret == -1) {
963 talloc_free(msg);
964 return LDB_ERR_OPERATIONS_ERROR;
967 ret = ldb_module_send_entry(ac->req, msg, NULL);
968 if (ret != LDB_SUCCESS) {
969 /* Regardless of success or failure, the msg
970 * is the callbacks responsiblity, and should
971 * not be talloc_free()'ed */
972 ac->request_terminated = true;
973 return ret;
976 (*match_count)++;
979 return LDB_SUCCESS;
983 remove any duplicated entries in a indexed result
985 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
987 unsigned int i, new_count;
989 if (list->count < 2) {
990 return;
993 TYPESAFE_QSORT(list->dn, list->count, dn_list_cmp);
995 new_count = 1;
996 for (i=1; i<list->count; i++) {
997 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
998 if (new_count != i) {
999 list->dn[new_count] = list->dn[i];
1001 new_count++;
1005 list->count = new_count;
1009 search the database with a LDAP-like expression using indexes
1010 returns -1 if an indexed search is not possible, in which
1011 case the caller should call ltdb_search_full()
1013 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1015 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1016 struct dn_list *dn_list;
1017 int ret;
1019 /* see if indexing is enabled */
1020 if (!ltdb->cache->attribute_indexes &&
1021 !ltdb->cache->one_level_indexes &&
1022 ac->scope != LDB_SCOPE_BASE) {
1023 /* fallback to a full search */
1024 return LDB_ERR_OPERATIONS_ERROR;
1027 dn_list = talloc_zero(ac, struct dn_list);
1028 if (dn_list == NULL) {
1029 return ldb_module_oom(ac->module);
1032 switch (ac->scope) {
1033 case LDB_SCOPE_BASE:
1034 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1035 if (dn_list->dn == NULL) {
1036 talloc_free(dn_list);
1037 return ldb_module_oom(ac->module);
1039 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1040 if (dn_list->dn[0].data == NULL) {
1041 talloc_free(dn_list);
1042 return ldb_module_oom(ac->module);
1044 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1045 dn_list->count = 1;
1046 break;
1048 case LDB_SCOPE_ONELEVEL:
1049 if (!ltdb->cache->one_level_indexes) {
1050 talloc_free(dn_list);
1051 return LDB_ERR_OPERATIONS_ERROR;
1053 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1054 if (ret != LDB_SUCCESS) {
1055 talloc_free(dn_list);
1056 return ret;
1058 break;
1060 case LDB_SCOPE_SUBTREE:
1061 case LDB_SCOPE_DEFAULT:
1062 if (!ltdb->cache->attribute_indexes) {
1063 talloc_free(dn_list);
1064 return LDB_ERR_OPERATIONS_ERROR;
1066 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1067 if (ret != LDB_SUCCESS) {
1068 talloc_free(dn_list);
1069 return ret;
1071 ltdb_dn_list_remove_duplicates(dn_list);
1072 break;
1075 ret = ltdb_index_filter(dn_list, ac, match_count);
1076 talloc_free(dn_list);
1077 return ret;
1081 add an index entry for one message element
1083 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1084 struct ldb_message_element *el, int v_idx)
1086 struct ldb_context *ldb;
1087 struct ldb_dn *dn_key;
1088 int ret;
1089 const struct ldb_schema_attribute *a;
1090 struct dn_list *list;
1091 unsigned alloc_len;
1093 ldb = ldb_module_get_ctx(module);
1095 list = talloc_zero(module, struct dn_list);
1096 if (list == NULL) {
1097 return LDB_ERR_OPERATIONS_ERROR;
1100 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1101 if (!dn_key) {
1102 talloc_free(list);
1103 return LDB_ERR_OPERATIONS_ERROR;
1105 talloc_steal(list, dn_key);
1107 ret = ltdb_dn_list_load(module, dn_key, list);
1108 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1109 talloc_free(list);
1110 return ret;
1113 if (ltdb_dn_list_find_str(list, dn) != -1) {
1114 talloc_free(list);
1115 return LDB_SUCCESS;
1118 if (list->count > 0 &&
1119 a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1120 talloc_free(list);
1121 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1122 el->name, dn);
1123 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1126 /* overallocate the list a bit, to reduce the number of
1127 * realloc trigered copies */
1128 alloc_len = ((list->count+1)+7) & ~7;
1129 list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1130 if (list->dn == NULL) {
1131 talloc_free(list);
1132 return LDB_ERR_OPERATIONS_ERROR;
1134 list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1135 list->dn[list->count].length = strlen(dn);
1136 list->count++;
1138 ret = ltdb_dn_list_store(module, dn_key, list);
1140 talloc_free(list);
1142 return ret;
1146 add index entries for one elements in a message
1148 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1149 struct ldb_message_element *el)
1151 unsigned int i;
1152 for (i = 0; i < el->num_values; i++) {
1153 int ret = ltdb_index_add1(module, dn, el, i);
1154 if (ret != LDB_SUCCESS) {
1155 return ret;
1159 return LDB_SUCCESS;
1163 add index entries for all elements in a message
1165 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1166 struct ldb_message_element *elements, int num_el)
1168 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1169 unsigned int i;
1171 if (dn[0] == '@') {
1172 return LDB_SUCCESS;
1175 if (ltdb->cache->indexlist->num_elements == 0) {
1176 /* no indexed fields */
1177 return LDB_SUCCESS;
1180 for (i = 0; i < num_el; i++) {
1181 int ret;
1182 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1183 continue;
1185 ret = ltdb_index_add_el(module, dn, &elements[i]);
1186 if (ret != LDB_SUCCESS) {
1187 struct ldb_context *ldb = ldb_module_get_ctx(module);
1188 ldb_asprintf_errstring(ldb,
1189 __location__ ": Failed to re-index %s in %s - %s",
1190 elements[i].name, dn, ldb_errstring(ldb));
1191 return ret;
1195 return LDB_SUCCESS;
1200 insert a one level index for a message
1202 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1204 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1205 struct ldb_message_element el;
1206 struct ldb_val val;
1207 struct ldb_dn *pdn;
1208 const char *dn;
1209 int ret;
1211 /* We index for ONE Level only if requested */
1212 if (!ltdb->cache->one_level_indexes) {
1213 return LDB_SUCCESS;
1216 pdn = ldb_dn_get_parent(module, msg->dn);
1217 if (pdn == NULL) {
1218 return LDB_ERR_OPERATIONS_ERROR;
1221 dn = ldb_dn_get_linearized(msg->dn);
1222 if (dn == NULL) {
1223 talloc_free(pdn);
1224 return LDB_ERR_OPERATIONS_ERROR;
1227 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1228 if (val.data == NULL) {
1229 talloc_free(pdn);
1230 return LDB_ERR_OPERATIONS_ERROR;
1233 val.length = strlen((char *)val.data);
1234 el.name = LTDB_IDXONE;
1235 el.values = &val;
1236 el.num_values = 1;
1238 if (add) {
1239 ret = ltdb_index_add1(module, dn, &el, 0);
1240 } else { /* delete */
1241 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1244 talloc_free(pdn);
1246 return ret;
1250 add the index entries for a new element in a record
1251 The caller guarantees that these element values are not yet indexed
1253 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn,
1254 struct ldb_message_element *el)
1256 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1257 if (ldb_dn_is_special(dn)) {
1258 return LDB_SUCCESS;
1260 if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1261 return LDB_SUCCESS;
1263 return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1267 add the index entries for a new record
1269 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1271 const char *dn;
1272 int ret;
1274 if (ldb_dn_is_special(msg->dn)) {
1275 return LDB_SUCCESS;
1278 dn = ldb_dn_get_linearized(msg->dn);
1279 if (dn == NULL) {
1280 return LDB_ERR_OPERATIONS_ERROR;
1283 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1284 if (ret != LDB_SUCCESS) {
1285 return ret;
1288 return ltdb_index_onelevel(module, msg, 1);
1293 delete an index entry for one message element
1295 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1296 struct ldb_message_element *el, unsigned int v_idx)
1298 struct ldb_context *ldb;
1299 struct ldb_dn *dn_key;
1300 const char *dn_str;
1301 int ret, i;
1302 unsigned int j;
1303 struct dn_list *list;
1305 ldb = ldb_module_get_ctx(module);
1307 dn_str = ldb_dn_get_linearized(dn);
1308 if (dn_str == NULL) {
1309 return LDB_ERR_OPERATIONS_ERROR;
1312 if (dn_str[0] == '@') {
1313 return LDB_SUCCESS;
1316 dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1317 if (!dn_key) {
1318 return LDB_ERR_OPERATIONS_ERROR;
1321 list = talloc_zero(dn_key, struct dn_list);
1322 if (list == NULL) {
1323 talloc_free(dn_key);
1324 return LDB_ERR_OPERATIONS_ERROR;
1327 ret = ltdb_dn_list_load(module, dn_key, list);
1328 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1329 /* it wasn't indexed. Did we have an earlier error? If we did then
1330 its gone now */
1331 talloc_free(dn_key);
1332 return LDB_SUCCESS;
1335 if (ret != LDB_SUCCESS) {
1336 talloc_free(dn_key);
1337 return ret;
1340 i = ltdb_dn_list_find_str(list, dn_str);
1341 if (i == -1) {
1342 /* nothing to delete */
1343 talloc_free(dn_key);
1344 return LDB_SUCCESS;
1347 j = (unsigned int) i;
1348 if (j != list->count - 1) {
1349 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1351 list->count--;
1352 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1354 ret = ltdb_dn_list_store(module, dn_key, list);
1356 talloc_free(dn_key);
1358 return ret;
1362 delete the index entries for a element
1363 return -1 on failure
1365 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1366 struct ldb_message_element *el)
1368 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1369 const char *dn_str;
1370 int ret;
1371 unsigned int i;
1373 if (!ltdb->cache->attribute_indexes) {
1374 /* no indexed fields */
1375 return LDB_SUCCESS;
1378 dn_str = ldb_dn_get_linearized(dn);
1379 if (dn_str == NULL) {
1380 return LDB_ERR_OPERATIONS_ERROR;
1383 if (dn_str[0] == '@') {
1384 return LDB_SUCCESS;
1387 if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1388 return LDB_SUCCESS;
1390 for (i = 0; i < el->num_values; i++) {
1391 ret = ltdb_index_del_value(module, dn, el, i);
1392 if (ret != LDB_SUCCESS) {
1393 return ret;
1397 return LDB_SUCCESS;
1401 delete the index entries for a record
1402 return -1 on failure
1404 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1406 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1407 int ret;
1408 unsigned int i;
1410 if (ldb_dn_is_special(msg->dn)) {
1411 return LDB_SUCCESS;
1414 ret = ltdb_index_onelevel(module, msg, 0);
1415 if (ret != LDB_SUCCESS) {
1416 return ret;
1419 if (!ltdb->cache->attribute_indexes) {
1420 /* no indexed fields */
1421 return LDB_SUCCESS;
1424 for (i = 0; i < msg->num_elements; i++) {
1425 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1426 if (ret != LDB_SUCCESS) {
1427 return ret;
1431 return LDB_SUCCESS;
1436 traversal function that deletes all @INDEX records
1438 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1440 struct ldb_module *module = state;
1441 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1442 const char *dnstr = "DN=" LTDB_INDEX ":";
1443 struct dn_list list;
1444 struct ldb_dn *dn;
1445 struct ldb_val v;
1446 int ret;
1448 if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1449 return 0;
1451 /* we need to put a empty list in the internal tdb for this
1452 * index entry */
1453 list.dn = NULL;
1454 list.count = 0;
1456 /* the offset of 3 is to remove the DN= prefix. */
1457 v.data = key.dptr + 3;
1458 v.length = strnlen((char *)key.dptr, key.dsize) - 3;
1460 dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1461 ret = ltdb_dn_list_store(module, dn, &list);
1462 if (ret != LDB_SUCCESS) {
1463 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1464 "Unable to store null index for %s\n",
1465 ldb_dn_get_linearized(dn));
1466 talloc_free(dn);
1467 return -1;
1469 talloc_free(dn);
1470 return 0;
1473 struct ltdb_reindex_context {
1474 struct ldb_module *module;
1475 int error;
1479 traversal function that adds @INDEX records during a re index
1481 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1483 struct ldb_context *ldb;
1484 struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1485 struct ldb_module *module = ctx->module;
1486 struct ldb_message *msg;
1487 const char *dn = NULL;
1488 int ret;
1489 TDB_DATA key2;
1491 ldb = ldb_module_get_ctx(module);
1493 if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1494 strncmp((char *)key.dptr, "DN=", 3) != 0) {
1495 return 0;
1498 msg = ldb_msg_new(module);
1499 if (msg == NULL) {
1500 return -1;
1503 ret = ltdb_unpack_data(module, &data, msg);
1504 if (ret != 0) {
1505 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1506 ldb_dn_get_linearized(msg->dn));
1507 talloc_free(msg);
1508 return -1;
1511 /* check if the DN key has changed, perhaps due to the
1512 case insensitivity of an element changing */
1513 key2 = ltdb_key(module, msg->dn);
1514 if (key2.dptr == NULL) {
1515 /* probably a corrupt record ... darn */
1516 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1517 ldb_dn_get_linearized(msg->dn));
1518 talloc_free(msg);
1519 return 0;
1521 if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1522 tdb_delete(tdb, key);
1523 tdb_store(tdb, key2, data, 0);
1525 talloc_free(key2.dptr);
1527 if (msg->dn == NULL) {
1528 dn = (char *)key.dptr + 3;
1529 } else {
1530 dn = ldb_dn_get_linearized(msg->dn);
1533 ret = ltdb_index_onelevel(module, msg, 1);
1534 if (ret != LDB_SUCCESS) {
1535 ldb_debug(ldb, LDB_DEBUG_ERROR,
1536 "Adding special ONE LEVEL index failed (%s)!",
1537 ldb_dn_get_linearized(msg->dn));
1538 talloc_free(msg);
1539 return -1;
1542 ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1544 if (ret != LDB_SUCCESS) {
1545 ctx->error = ret;
1546 talloc_free(msg);
1547 return -1;
1550 talloc_free(msg);
1552 return 0;
1556 force a complete reindex of the database
1558 int ltdb_reindex(struct ldb_module *module)
1560 struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1561 int ret;
1562 struct ltdb_reindex_context ctx;
1564 if (ltdb_cache_reload(module) != 0) {
1565 return LDB_ERR_OPERATIONS_ERROR;
1568 /* first traverse the database deleting any @INDEX records by
1569 * putting NULL entries in the in-memory tdb
1571 ret = tdb_traverse(ltdb->tdb, delete_index, module);
1572 if (ret < 0) {
1573 return LDB_ERR_OPERATIONS_ERROR;
1576 /* if we don't have indexes we have nothing todo */
1577 if (ltdb->cache->indexlist->num_elements == 0) {
1578 return LDB_SUCCESS;
1581 ctx.module = module;
1582 ctx.error = 0;
1584 /* now traverse adding any indexes for normal LDB records */
1585 ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1586 if (ret < 0) {
1587 struct ldb_context *ldb = ldb_module_get_ctx(module);
1588 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
1589 return LDB_ERR_OPERATIONS_ERROR;
1592 if (ctx.error != LDB_SUCCESS) {
1593 struct ldb_context *ldb = ldb_module_get_ctx(module);
1594 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1595 return ctx.error;
1598 return LDB_SUCCESS;