lib ldb key value: fix index buffering
[Samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
blob2671d55a4d348c5aac989acf2ade60bda0152859
1 /*
2 ldb database library
4 Copyright (C) Andrew Tridgell 2004-2009
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 * Name: ldb
27 * Component: ldb key value backend - indexing
29 * Description: indexing routines for ldb key value backend
31 * Author: Andrew Tridgell
36 LDB Index design and choice of key:
37 =======================================
39 LDB has index records held as LDB objects with a special record like:
41 dn: @INDEX:attr:value
43 value may be base64 encoded, if it is deemed not printable:
45 dn: @INDEX:attr::base64-value
47 In each record, there is two possible formats:
49 The original format is:
50 -----------------------
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
56 In this format, @IDX is multi-valued, one entry for each match
58 The corrosponding entry is stored in a TDB record with key:
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
65 The original mixed-case DN is stored in the entry iself.
68 The new 'GUID index' format is:
69 -------------------------------
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed. The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
82 The corrosponding entry is stored in a TDB record with key:
84 GUID=<binary GUID>
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
95 Exception for special @ DNs:
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
102 Control points for choice of index mode
103 ---------------------------------------
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
112 By default, the original DN format is used.
115 Control points for choosing indexed attributes
116 ----------------------------------------------
118 @IDXATTR controls if an attribute is indexed
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
125 C Override functions
126 --------------------
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129 const char *GUID_index_attribute,
130 const char *GUID_index_dn_component)
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136 bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138 ldb_attribute_handler_override_fn_t override,
139 void *private_data);
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151 #include "lib/util/attr.h"
153 struct dn_list {
154 unsigned int count;
155 struct ldb_val *dn;
157 * Do not optimise the intersection of this list,
158 * we must never return an entry not in this
159 * list. This allows the index for
160 * SCOPE_ONELEVEL to be trusted.
162 bool strict;
165 struct ldb_kv_idxptr {
167 * In memory tdb to cache the index updates performed during a
168 * transaction. This improves the performance of operations like
169 * re-index and join
171 struct tdb_context *itdb;
172 int error;
175 enum key_truncation {
176 KEY_NOT_TRUNCATED,
177 KEY_TRUNCATED,
180 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
181 const struct ldb_message *msg,
182 int add);
183 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
184 struct ldb_kv_private *ldb_kv,
185 struct ldb_dn *base_dn,
186 struct dn_list *dn_list,
187 enum key_truncation *truncation);
189 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
190 struct dn_list *list);
192 /* we put a @IDXVERSION attribute on index entries. This
193 allows us to tell if it was written by an older version
195 #define LDB_KV_INDEXING_VERSION 2
197 #define LDB_KV_GUID_INDEXING_VERSION 3
199 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
201 if (ldb_kv->max_key_length == 0) {
202 return UINT_MAX;
204 return ldb_kv->max_key_length;
207 /* enable the idxptr mode when transactions start */
208 int ldb_kv_index_transaction_start(
209 struct ldb_module *module,
210 size_t cache_size)
212 struct ldb_kv_private *ldb_kv = talloc_get_type(
213 ldb_module_get_private(module), struct ldb_kv_private);
214 ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
215 if (ldb_kv->idxptr == NULL) {
216 return ldb_oom(ldb_module_get_ctx(module));
219 ldb_kv->idxptr->itdb = tdb_open(
220 NULL,
221 cache_size,
222 TDB_INTERNAL,
223 O_RDWR,
225 if (ldb_kv->idxptr->itdb == NULL) {
226 return LDB_ERR_OPERATIONS_ERROR;
229 return LDB_SUCCESS;
233 see if two ldb_val structures contain exactly the same data
234 return -1 or 1 for a mismatch, 0 for match
236 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
237 const struct ldb_val *v2)
239 if (v1->length > v2->length) {
240 return -1;
242 if (v1->length < v2->length) {
243 return 1;
245 return memcmp(v1->data, v2->data, v1->length);
249 see if two ldb_val structures contain exactly the same data
250 return -1 or 1 for a mismatch, 0 for match
252 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
253 const struct ldb_val *v2)
255 if (v1.length > v2->length) {
256 return -1;
258 if (v1.length < v2->length) {
259 return 1;
261 return memcmp(v1.data, v2->data, v1.length);
266 find a entry in a dn_list, using a ldb_val. Uses a case sensitive
267 binary-safe comparison for the 'dn' returns -1 if not found
269 This is therefore safe when the value is a GUID in the future
271 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
272 const struct dn_list *list,
273 const struct ldb_val *v)
275 unsigned int i;
276 struct ldb_val *exact = NULL, *next = NULL;
278 if (ldb_kv->cache->GUID_index_attribute == NULL) {
279 for (i=0; i<list->count; i++) {
280 if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
281 return i;
284 return -1;
287 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
288 *v, ldb_val_equal_exact_ordered,
289 exact, next);
290 if (exact == NULL) {
291 return -1;
293 /* Not required, but keeps the compiler quiet */
294 if (next != NULL) {
295 return -1;
298 i = exact - list->dn;
299 return i;
303 find a entry in a dn_list. Uses a case sensitive comparison with the dn
304 returns -1 if not found
306 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
307 struct dn_list *list,
308 const struct ldb_message *msg)
310 struct ldb_val v;
311 const struct ldb_val *key_val;
312 if (ldb_kv->cache->GUID_index_attribute == NULL) {
313 const char *dn_str = ldb_dn_get_linearized(msg->dn);
314 v.data = discard_const_p(unsigned char, dn_str);
315 v.length = strlen(dn_str);
316 } else {
317 key_val = ldb_msg_find_ldb_val(
318 msg, ldb_kv->cache->GUID_index_attribute);
319 if (key_val == NULL) {
320 return -1;
322 v = *key_val;
324 return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
328 this is effectively a cast function, but with lots of paranoia
329 checks and also copes with CPUs that are fussy about pointer
330 alignment
332 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
333 TDB_DATA rec)
335 struct dn_list *list;
336 if (rec.dsize != sizeof(void *)) {
337 ldb_asprintf_errstring(ldb_module_get_ctx(module),
338 "Bad data size for idxptr %u", (unsigned)rec.dsize);
339 return NULL;
341 /* note that we can't just use a cast here, as rec.dptr may
342 not be aligned sufficiently for a pointer. A cast would cause
343 platforms like some ARM CPUs to crash */
344 memcpy(&list, rec.dptr, sizeof(void *));
345 list = talloc_get_type(list, struct dn_list);
346 if (list == NULL) {
347 ldb_asprintf_errstring(ldb_module_get_ctx(module),
348 "Bad type '%s' for idxptr",
349 talloc_get_name(list));
350 return NULL;
352 return list;
355 enum dn_list_will_be_read_only {
356 DN_LIST_MUTABLE = 0,
357 DN_LIST_WILL_BE_READ_ONLY = 1,
361 return the @IDX list in an index entry for a dn as a
362 struct dn_list
364 static int ldb_kv_dn_list_load(struct ldb_module *module,
365 struct ldb_kv_private *ldb_kv,
366 struct ldb_dn *dn,
367 struct dn_list *list,
368 enum dn_list_will_be_read_only read_only)
370 struct ldb_message *msg;
371 int ret, version;
372 struct ldb_message_element *el;
373 TDB_DATA rec = {0};
374 struct dn_list *list2;
375 bool from_primary_cache = false;
376 TDB_DATA key = {0};
378 list->dn = NULL;
379 list->count = 0;
380 list->strict = false;
383 * See if we have an in memory index cache
385 if (ldb_kv->idxptr == NULL) {
386 goto normal_index;
389 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
390 key.dsize = strlen((char *)key.dptr);
393 * Have we cached this index record?
394 * If we have a nested transaction cache try that first.
395 * then try the transaction cache.
396 * if the record is not cached it will need to be read from disk.
398 if (ldb_kv->nested_idx_ptr != NULL) {
399 rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
401 if (rec.dptr == NULL) {
402 from_primary_cache = true;
403 rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
405 if (rec.dptr == NULL) {
406 goto normal_index;
409 /* we've found an in-memory index entry */
410 list2 = ldb_kv_index_idxptr(module, rec);
411 if (list2 == NULL) {
412 free(rec.dptr);
413 return LDB_ERR_OPERATIONS_ERROR;
415 free(rec.dptr);
418 * If this is a read only transaction the indexes will not be
419 * changed so we don't need a copy in the event of a rollback
421 * In this case make an early return
423 if (read_only == DN_LIST_WILL_BE_READ_ONLY) {
424 *list = *list2;
425 return LDB_SUCCESS;
429 * record was read from the sub transaction cache, so we have
430 * already copied the primary cache record
432 if (!from_primary_cache) {
433 *list = *list2;
434 return LDB_SUCCESS;
438 * No index sub transaction active, so no need to cache a copy
440 if (ldb_kv->nested_idx_ptr == NULL) {
441 *list = *list2;
442 return LDB_SUCCESS;
446 * There is an active index sub transaction, and the record was
447 * found in the primary index transaction cache. A copy of the
448 * record needs be taken to prevent the original entry being
449 * altered, until the index sub transaction is committed.
453 struct ldb_val *dns = NULL;
454 size_t x = 0;
456 dns = talloc_array(
457 list,
458 struct ldb_val,
459 list2->count);
460 if (dns == NULL) {
461 return LDB_ERR_OPERATIONS_ERROR;
463 for (x = 0; x < list2->count; x++) {
464 dns[x].length = list2->dn[x].length;
465 dns[x].data = talloc_memdup(
466 dns,
467 list2->dn[x].data,
468 list2->dn[x].length);
469 if (dns[x].data == NULL) {
470 TALLOC_FREE(dns);
471 return LDB_ERR_OPERATIONS_ERROR;
474 list->dn = dns;
475 list->count = list2->count;
477 return LDB_SUCCESS;
480 * Index record not found in the caches, read it from the
481 * database.
483 normal_index:
484 msg = ldb_msg_new(list);
485 if (msg == NULL) {
486 return LDB_ERR_OPERATIONS_ERROR;
489 ret = ldb_kv_search_dn1(module,
491 msg,
492 LDB_UNPACK_DATA_FLAG_NO_DN |
494 * The entry point ldb_kv_search_indexed is
495 * only called from the read-locked
496 * ldb_kv_search.
498 LDB_UNPACK_DATA_FLAG_READ_LOCKED);
499 if (ret != LDB_SUCCESS) {
500 talloc_free(msg);
501 return ret;
504 el = ldb_msg_find_element(msg, LDB_KV_IDX);
505 if (!el) {
506 talloc_free(msg);
507 return LDB_SUCCESS;
510 version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
513 * we avoid copying the strings by stealing the list. We have
514 * to steal msg onto el->values (which looks odd) because
515 * the memory is allocated on msg, not on each value.
517 if (ldb_kv->cache->GUID_index_attribute == NULL) {
518 /* check indexing version number */
519 if (version != LDB_KV_INDEXING_VERSION) {
520 ldb_debug_set(ldb_module_get_ctx(module),
521 LDB_DEBUG_ERROR,
522 "Wrong DN index version %d "
523 "expected %d for %s",
524 version, LDB_KV_INDEXING_VERSION,
525 ldb_dn_get_linearized(dn));
526 talloc_free(msg);
527 return LDB_ERR_OPERATIONS_ERROR;
530 talloc_steal(el->values, msg);
531 list->dn = talloc_steal(list, el->values);
532 list->count = el->num_values;
533 } else {
534 unsigned int i;
535 if (version != LDB_KV_GUID_INDEXING_VERSION) {
536 /* This is quite likely during the DB startup
537 on first upgrade to using a GUID index */
538 ldb_debug_set(ldb_module_get_ctx(module),
539 LDB_DEBUG_ERROR,
540 "Wrong GUID index version %d "
541 "expected %d for %s",
542 version, LDB_KV_GUID_INDEXING_VERSION,
543 ldb_dn_get_linearized(dn));
544 talloc_free(msg);
545 return LDB_ERR_OPERATIONS_ERROR;
548 if (el->num_values == 0) {
549 talloc_free(msg);
550 return LDB_ERR_OPERATIONS_ERROR;
553 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
554 talloc_free(msg);
555 return LDB_ERR_OPERATIONS_ERROR;
558 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
559 list->dn = talloc_array(list, struct ldb_val, list->count);
560 if (list->dn == NULL) {
561 talloc_free(msg);
562 return LDB_ERR_OPERATIONS_ERROR;
566 * The actual data is on msg.
568 talloc_steal(list->dn, msg);
569 for (i = 0; i < list->count; i++) {
570 list->dn[i].data
571 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
572 list->dn[i].length = LDB_KV_GUID_SIZE;
576 /* We don't need msg->elements any more */
577 talloc_free(msg->elements);
578 return LDB_SUCCESS;
581 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
582 struct ldb_kv_private *ldb_kv,
583 TALLOC_CTX *mem_ctx,
584 struct ldb_dn *dn,
585 struct ldb_val *ldb_key)
587 struct ldb_context *ldb = ldb_module_get_ctx(module);
588 int ret;
589 int index = 0;
590 enum key_truncation truncation = KEY_NOT_TRUNCATED;
591 struct dn_list *list = talloc(mem_ctx, struct dn_list);
592 if (list == NULL) {
593 ldb_oom(ldb);
594 return LDB_ERR_OPERATIONS_ERROR;
597 ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
598 if (ret != LDB_SUCCESS) {
599 TALLOC_FREE(list);
600 return ret;
603 if (list->count == 0) {
604 TALLOC_FREE(list);
605 return LDB_ERR_NO_SUCH_OBJECT;
608 if (list->count > 1 && truncation == KEY_NOT_TRUNCATED) {
609 const char *dn_str = ldb_dn_get_linearized(dn);
610 ldb_asprintf_errstring(ldb_module_get_ctx(module),
611 __location__
612 ": Failed to read DN index "
613 "against %s for %s: too many "
614 "values (%u > 1)",
615 ldb_kv->cache->GUID_index_attribute,
616 dn_str,
617 list->count);
618 TALLOC_FREE(list);
619 return LDB_ERR_CONSTRAINT_VIOLATION;
622 if (list->count > 0 && truncation == KEY_TRUNCATED) {
624 * DN key has been truncated, need to inspect the actual
625 * records to locate the actual DN
627 unsigned int i;
628 index = -1;
629 for (i=0; i < list->count; i++) {
630 uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
631 struct ldb_val key = {
632 .data = guid_key,
633 .length = sizeof(guid_key)
635 const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
636 struct ldb_message *rec = ldb_msg_new(ldb);
637 if (rec == NULL) {
638 TALLOC_FREE(list);
639 return LDB_ERR_OPERATIONS_ERROR;
642 ret = ldb_kv_idx_to_key(
643 module, ldb_kv, ldb, &list->dn[i], &key);
644 if (ret != LDB_SUCCESS) {
645 TALLOC_FREE(list);
646 TALLOC_FREE(rec);
647 return ret;
650 ret =
651 ldb_kv_search_key(module, ldb_kv, key, rec, flags);
652 if (key.data != guid_key) {
653 TALLOC_FREE(key.data);
655 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
657 * the record has disappeared?
658 * yes, this can happen
660 TALLOC_FREE(rec);
661 continue;
664 if (ret != LDB_SUCCESS) {
665 /* an internal error */
666 TALLOC_FREE(rec);
667 TALLOC_FREE(list);
668 return LDB_ERR_OPERATIONS_ERROR;
672 * We found the actual DN that we wanted from in the
673 * multiple values that matched the index
674 * (due to truncation), so return that.
677 if (ldb_dn_compare(dn, rec->dn) == 0) {
678 index = i;
679 TALLOC_FREE(rec);
680 break;
685 * We matched the index but the actual DN we wanted
686 * was not here.
688 if (index == -1) {
689 TALLOC_FREE(list);
690 return LDB_ERR_NO_SUCH_OBJECT;
694 /* The ldb_key memory is allocated by the caller */
695 ret = ldb_kv_guid_to_key(&list->dn[index], ldb_key);
696 TALLOC_FREE(list);
698 if (ret != LDB_SUCCESS) {
699 return LDB_ERR_OPERATIONS_ERROR;
702 return LDB_SUCCESS;
708 save a dn_list into a full @IDX style record
710 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
711 struct ldb_kv_private *ldb_kv,
712 struct ldb_dn *dn,
713 struct dn_list *list)
715 struct ldb_message *msg;
716 int ret;
718 msg = ldb_msg_new(module);
719 if (!msg) {
720 return ldb_module_oom(module);
723 msg->dn = dn;
725 if (list->count == 0) {
726 ret = ldb_kv_delete_noindex(module, msg);
727 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
728 ret = LDB_SUCCESS;
730 talloc_free(msg);
731 return ret;
734 if (ldb_kv->cache->GUID_index_attribute == NULL) {
735 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
736 LDB_KV_INDEXING_VERSION);
737 if (ret != LDB_SUCCESS) {
738 talloc_free(msg);
739 return ldb_module_oom(module);
741 } else {
742 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
743 LDB_KV_GUID_INDEXING_VERSION);
744 if (ret != LDB_SUCCESS) {
745 talloc_free(msg);
746 return ldb_module_oom(module);
750 if (list->count > 0) {
751 struct ldb_message_element *el;
753 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
754 if (ret != LDB_SUCCESS) {
755 talloc_free(msg);
756 return ldb_module_oom(module);
759 if (ldb_kv->cache->GUID_index_attribute == NULL) {
760 el->values = list->dn;
761 el->num_values = list->count;
762 } else {
763 struct ldb_val v;
764 unsigned int i;
765 el->values = talloc_array(msg,
766 struct ldb_val, 1);
767 if (el->values == NULL) {
768 talloc_free(msg);
769 return ldb_module_oom(module);
772 v.data = talloc_array_size(el->values,
773 list->count,
774 LDB_KV_GUID_SIZE);
775 if (v.data == NULL) {
776 talloc_free(msg);
777 return ldb_module_oom(module);
780 v.length = talloc_get_size(v.data);
782 for (i = 0; i < list->count; i++) {
783 if (list->dn[i].length !=
784 LDB_KV_GUID_SIZE) {
785 talloc_free(msg);
786 return ldb_module_operr(module);
788 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
789 list->dn[i].data,
790 LDB_KV_GUID_SIZE);
792 el->values[0] = v;
793 el->num_values = 1;
797 ret = ldb_kv_store(module, msg, TDB_REPLACE);
798 talloc_free(msg);
799 return ret;
803 save a dn_list into the database, in either @IDX or internal format
805 static int ldb_kv_dn_list_store(struct ldb_module *module,
806 struct ldb_dn *dn,
807 struct dn_list *list)
809 struct ldb_kv_private *ldb_kv = talloc_get_type(
810 ldb_module_get_private(module), struct ldb_kv_private);
811 TDB_DATA rec = {0};
812 TDB_DATA key = {0};
814 int ret = LDB_SUCCESS;
815 struct dn_list *list2 = NULL;
816 struct ldb_kv_idxptr *idxptr = NULL;
818 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
819 if (key.dptr == NULL) {
820 return LDB_ERR_OPERATIONS_ERROR;
822 key.dsize = strlen((char *)key.dptr);
825 * If there is an index sub transaction active, update the
826 * sub transaction index cache. Otherwise update the
827 * primary index cache
829 if (ldb_kv->nested_idx_ptr != NULL) {
830 idxptr = ldb_kv->nested_idx_ptr;
831 } else {
832 idxptr = ldb_kv->idxptr;
835 * Get the cache entry for the index
837 * As the value in the cache is a pointer to a dn_list we update
838 * the dn_list directly.
841 rec = tdb_fetch(idxptr->itdb, key);
842 if (rec.dptr != NULL) {
843 list2 = ldb_kv_index_idxptr(module, rec);
844 if (list2 == NULL) {
845 free(rec.dptr);
846 return LDB_ERR_OPERATIONS_ERROR;
848 free(rec.dptr);
849 /* Now put the updated pointer back in the cache */
850 if (list->dn == NULL) {
851 list2->dn = NULL;
852 list2->count = 0;
853 } else {
854 list2->dn = talloc_steal(list2, list->dn);
855 list2->count = list->count;
857 return LDB_SUCCESS;
860 list2 = talloc(idxptr, struct dn_list);
861 if (list2 == NULL) {
862 return LDB_ERR_OPERATIONS_ERROR;
864 list2->dn = talloc_steal(list2, list->dn);
865 list2->count = list->count;
867 rec.dptr = (uint8_t *)&list2;
868 rec.dsize = sizeof(void *);
872 * This is not a store into the main DB, but into an in-memory
873 * TDB, so we don't need a guard on ltdb->read_only
875 * Also as we directly update the in memory dn_list for existing
876 * cache entries we must be adding a new entry to the cache.
878 ret = tdb_store(idxptr->itdb, key, rec, TDB_INSERT);
879 if (ret != 0) {
880 return ltdb_err_map( tdb_error(idxptr->itdb));
882 return LDB_SUCCESS;
886 traverse function for storing the in-memory index entries on disk
888 static int ldb_kv_index_traverse_store(_UNUSED_ struct tdb_context *tdb,
889 TDB_DATA key,
890 TDB_DATA data,
891 void *state)
893 struct ldb_module *module = state;
894 struct ldb_kv_private *ldb_kv = talloc_get_type(
895 ldb_module_get_private(module), struct ldb_kv_private);
896 struct ldb_dn *dn;
897 struct ldb_context *ldb = ldb_module_get_ctx(module);
898 struct ldb_val v;
899 struct dn_list *list;
901 list = ldb_kv_index_idxptr(module, data);
902 if (list == NULL) {
903 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
904 return -1;
907 v.data = key.dptr;
908 v.length = strnlen((char *)key.dptr, key.dsize);
910 dn = ldb_dn_from_ldb_val(module, ldb, &v);
911 if (dn == NULL) {
912 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
913 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
914 return -1;
917 ldb_kv->idxptr->error =
918 ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
919 talloc_free(dn);
920 if (ldb_kv->idxptr->error != 0) {
921 return -1;
923 return 0;
926 /* cleanup the idxptr mode when transaction commits */
927 int ldb_kv_index_transaction_commit(struct ldb_module *module)
929 struct ldb_kv_private *ldb_kv = talloc_get_type(
930 ldb_module_get_private(module), struct ldb_kv_private);
931 int ret;
933 struct ldb_context *ldb = ldb_module_get_ctx(module);
935 ldb_reset_err_string(ldb);
937 if (ldb_kv->idxptr->itdb) {
938 tdb_traverse(
939 ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
940 tdb_close(ldb_kv->idxptr->itdb);
943 ret = ldb_kv->idxptr->error;
944 if (ret != LDB_SUCCESS) {
945 if (!ldb_errstring(ldb)) {
946 ldb_set_errstring(ldb, ldb_strerror(ret));
948 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
951 talloc_free(ldb_kv->idxptr);
952 ldb_kv->idxptr = NULL;
953 return ret;
956 /* cleanup the idxptr mode when transaction cancels */
957 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
959 struct ldb_kv_private *ldb_kv = talloc_get_type(
960 ldb_module_get_private(module), struct ldb_kv_private);
961 if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
962 tdb_close(ldb_kv->idxptr->itdb);
964 TALLOC_FREE(ldb_kv->idxptr);
965 if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
966 tdb_close(ldb_kv->nested_idx_ptr->itdb);
968 TALLOC_FREE(ldb_kv->nested_idx_ptr);
969 return LDB_SUCCESS;
974 return the dn key to be used for an index
975 the caller is responsible for freeing
977 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
978 struct ldb_kv_private *ldb_kv,
979 const char *attr,
980 const struct ldb_val *value,
981 const struct ldb_schema_attribute **ap,
982 enum key_truncation *truncation)
984 struct ldb_dn *ret;
985 struct ldb_val v;
986 const struct ldb_schema_attribute *a = NULL;
987 char *attr_folded = NULL;
988 const char *attr_for_dn = NULL;
989 int r;
990 bool should_b64_encode;
992 unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
993 size_t key_len = 0;
994 size_t attr_len = 0;
995 const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
996 unsigned frmt_len = 0;
997 const size_t additional_key_length = 4;
998 unsigned int num_separators = 3; /* Estimate for overflow check */
999 const size_t min_data = 1;
1000 const size_t min_key_length = additional_key_length
1001 + indx_len + num_separators + min_data;
1002 struct ldb_val empty;
1005 * Accept a NULL value as a request for a key with no value. This is
1006 * different from passing an empty value, which might be given
1007 * significance by some canonicalise functions.
1009 bool empty_val = value == NULL;
1010 if (empty_val) {
1011 empty.length = 0;
1012 empty.data = discard_const_p(unsigned char, "");
1013 value = &empty;
1016 if (attr[0] == '@') {
1017 attr_for_dn = attr;
1018 v = *value;
1019 if (ap != NULL) {
1020 *ap = NULL;
1022 } else {
1023 attr_folded = ldb_attr_casefold(ldb, attr);
1024 if (!attr_folded) {
1025 return NULL;
1028 attr_for_dn = attr_folded;
1030 a = ldb_schema_attribute_by_name(ldb, attr);
1031 if (ap) {
1032 *ap = a;
1035 if (empty_val) {
1036 v = *value;
1037 } else {
1038 ldb_attr_handler_t fn;
1039 if (a->syntax->index_format_fn &&
1040 ldb_kv->cache->GUID_index_attribute != NULL) {
1041 fn = a->syntax->index_format_fn;
1042 } else {
1043 fn = a->syntax->canonicalise_fn;
1045 r = fn(ldb, ldb, value, &v);
1046 if (r != LDB_SUCCESS) {
1047 const char *errstr = ldb_errstring(ldb);
1048 /* canonicalisation can be refused. For
1049 example, a attribute that takes wildcards
1050 will refuse to canonicalise if the value
1051 contains a wildcard */
1052 ldb_asprintf_errstring(ldb,
1053 "Failed to create "
1054 "index key for "
1055 "attribute '%s':%s%s%s",
1056 attr, ldb_strerror(r),
1057 (errstr?":":""),
1058 (errstr?errstr:""));
1059 talloc_free(attr_folded);
1060 return NULL;
1064 attr_len = strlen(attr_for_dn);
1067 * Check if there is any hope this will fit into the DB.
1068 * Overflow here is not actually critical the code below
1069 * checks again to make the printf and the DB does another
1070 * check for too long keys
1072 if (max_key_length - attr_len < min_key_length) {
1073 ldb_asprintf_errstring(
1074 ldb,
1075 __location__ ": max_key_length "
1076 "is too small (%u) < (%u)",
1077 max_key_length,
1078 (unsigned)(min_key_length + attr_len));
1079 talloc_free(attr_folded);
1080 return NULL;
1084 * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
1085 * "DN=" and a trailing string terminator
1087 max_key_length -= additional_key_length;
1090 * We do not base 64 encode a DN in a key, it has already been
1091 * casefold and lineraized, that is good enough. That already
1092 * avoids embedded NUL etc.
1094 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1095 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
1096 should_b64_encode = false;
1097 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
1099 * We can only change the behaviour for IDXONE
1100 * when the GUID index is enabled
1102 should_b64_encode = false;
1103 } else {
1104 should_b64_encode
1105 = ldb_should_b64_encode(ldb, &v);
1107 } else {
1108 should_b64_encode = ldb_should_b64_encode(ldb, &v);
1111 if (should_b64_encode) {
1112 size_t vstr_len = 0;
1113 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
1114 if (!vstr) {
1115 talloc_free(attr_folded);
1116 return NULL;
1118 vstr_len = strlen(vstr);
1120 * Overflow here is not critical as we only use this
1121 * to choose the printf truncation
1123 key_len = num_separators + indx_len + attr_len + vstr_len;
1124 if (key_len > max_key_length) {
1125 size_t excess = key_len - max_key_length;
1126 frmt_len = vstr_len - excess;
1127 *truncation = KEY_TRUNCATED;
1129 * Truncated keys are placed in a separate key space
1130 * from the non truncated keys
1131 * Note: the double hash "##" is not a typo and
1132 * indicates that the following value is base64 encoded
1134 ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
1135 LDB_KV_INDEX, attr_for_dn,
1136 frmt_len, vstr);
1137 } else {
1138 frmt_len = vstr_len;
1139 *truncation = KEY_NOT_TRUNCATED;
1141 * Note: the double colon "::" is not a typo and
1142 * indicates that the following value is base64 encoded
1144 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1145 LDB_KV_INDEX, attr_for_dn,
1146 frmt_len, vstr);
1148 talloc_free(vstr);
1149 } else {
1150 /* Only need two seperators */
1151 num_separators = 2;
1154 * Overflow here is not critical as we only use this
1155 * to choose the printf truncation
1157 key_len = num_separators + indx_len + attr_len + (int)v.length;
1158 if (key_len > max_key_length) {
1159 size_t excess = key_len - max_key_length;
1160 frmt_len = v.length - excess;
1161 *truncation = KEY_TRUNCATED;
1163 * Truncated keys are placed in a separate key space
1164 * from the non truncated keys
1166 ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1167 LDB_KV_INDEX, attr_for_dn,
1168 frmt_len, (char *)v.data);
1169 } else {
1170 frmt_len = v.length;
1171 *truncation = KEY_NOT_TRUNCATED;
1172 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1173 LDB_KV_INDEX, attr_for_dn,
1174 frmt_len, (char *)v.data);
1178 if (v.data != value->data && !empty_val) {
1179 talloc_free(v.data);
1181 talloc_free(attr_folded);
1183 return ret;
1187 see if a attribute value is in the list of indexed attributes
1189 static bool ldb_kv_is_indexed(struct ldb_module *module,
1190 struct ldb_kv_private *ldb_kv,
1191 const char *attr)
1193 struct ldb_context *ldb = ldb_module_get_ctx(module);
1194 unsigned int i;
1195 struct ldb_message_element *el;
1197 if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1198 (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1199 /* Implicity covered, this is the index key */
1200 return false;
1202 if (ldb->schema.index_handler_override) {
1203 const struct ldb_schema_attribute *a
1204 = ldb_schema_attribute_by_name(ldb, attr);
1206 if (a == NULL) {
1207 return false;
1210 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1211 return true;
1212 } else {
1213 return false;
1217 if (!ldb_kv->cache->attribute_indexes) {
1218 return false;
1221 el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1222 if (el == NULL) {
1223 return false;
1226 /* TODO: this is too expensive! At least use a binary search */
1227 for (i=0; i<el->num_values; i++) {
1228 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1229 return true;
1232 return false;
1236 in the following logic functions, the return value is treated as
1237 follows:
1239 LDB_SUCCESS: we found some matching index values
1241 LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1243 LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1244 we'll need a full search
1248 return a list of dn's that might match a simple indexed search (an
1249 equality search only)
1251 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1252 struct ldb_kv_private *ldb_kv,
1253 const struct ldb_parse_tree *tree,
1254 struct dn_list *list)
1256 struct ldb_context *ldb;
1257 struct ldb_dn *dn;
1258 int ret;
1259 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1261 ldb = ldb_module_get_ctx(module);
1263 list->count = 0;
1264 list->dn = NULL;
1266 /* if the attribute isn't in the list of indexed attributes then
1267 this node needs a full search */
1268 if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1269 return LDB_ERR_OPERATIONS_ERROR;
1272 /* the attribute is indexed. Pull the list of DNs that match the
1273 search criterion */
1274 dn = ldb_kv_index_key(ldb,
1275 ldb_kv,
1276 tree->u.equality.attr,
1277 &tree->u.equality.value,
1278 NULL,
1279 &truncation);
1281 * We ignore truncation here and allow multi-valued matches
1282 * as ltdb_search_indexed will filter out the wrong one in
1283 * ltdb_index_filter() which calls ldb_match_message().
1285 if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1287 ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list,
1288 DN_LIST_WILL_BE_READ_ONLY);
1289 talloc_free(dn);
1290 return ret;
1293 static bool list_union(struct ldb_context *ldb,
1294 struct ldb_kv_private *ldb_kv,
1295 struct dn_list *list,
1296 struct dn_list *list2);
1299 return a list of dn's that might match a leaf indexed search
1301 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1302 struct ldb_kv_private *ldb_kv,
1303 const struct ldb_parse_tree *tree,
1304 struct dn_list *list)
1306 if (ldb_kv->disallow_dn_filter &&
1307 (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1308 /* in AD mode we do not support "(dn=...)" search filters */
1309 list->dn = NULL;
1310 list->count = 0;
1311 return LDB_SUCCESS;
1313 if (tree->u.equality.attr[0] == '@') {
1314 /* Do not allow a indexed search against an @ */
1315 list->dn = NULL;
1316 list->count = 0;
1317 return LDB_SUCCESS;
1319 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1320 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1321 bool valid_dn = false;
1322 struct ldb_dn *dn
1323 = ldb_dn_from_ldb_val(list,
1324 ldb_module_get_ctx(module),
1325 &tree->u.equality.value);
1326 if (dn == NULL) {
1327 /* If we can't parse it, no match */
1328 list->dn = NULL;
1329 list->count = 0;
1330 return LDB_SUCCESS;
1333 valid_dn = ldb_dn_validate(dn);
1334 if (valid_dn == false) {
1335 /* If we can't parse it, no match */
1336 list->dn = NULL;
1337 list->count = 0;
1338 return LDB_SUCCESS;
1342 * Re-use the same code we use for a SCOPE_BASE
1343 * search
1345 * We can't call TALLOC_FREE(dn) as this must belong
1346 * to list for the memory to remain valid.
1348 return ldb_kv_index_dn_base_dn(
1349 module, ldb_kv, dn, list, &truncation);
1351 * We ignore truncation here and allow multi-valued matches
1352 * as ltdb_search_indexed will filter out the wrong one in
1353 * ltdb_index_filter() which calls ldb_match_message().
1356 } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1357 (ldb_attr_cmp(tree->u.equality.attr,
1358 ldb_kv->cache->GUID_index_attribute) == 0)) {
1359 int ret;
1360 struct ldb_context *ldb = ldb_module_get_ctx(module);
1361 list->dn = talloc_array(list, struct ldb_val, 1);
1362 if (list->dn == NULL) {
1363 ldb_module_oom(module);
1364 return LDB_ERR_OPERATIONS_ERROR;
1367 * We need to go via the canonicalise_fn() to
1368 * ensure we get the index in binary, rather
1369 * than a string
1371 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1372 ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1373 if (ret != LDB_SUCCESS) {
1374 return LDB_ERR_OPERATIONS_ERROR;
1376 list->count = 1;
1377 return LDB_SUCCESS;
1380 return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1385 list intersection
1386 list = list & list2
1388 static bool list_intersect(struct ldb_kv_private *ldb_kv,
1389 struct dn_list *list,
1390 const struct dn_list *list2)
1392 const struct dn_list *short_list, *long_list;
1393 struct dn_list *list3;
1394 unsigned int i;
1396 if (list->count == 0) {
1397 /* 0 & X == 0 */
1398 return true;
1400 if (list2->count == 0) {
1401 /* X & 0 == 0 */
1402 list->count = 0;
1403 list->dn = NULL;
1404 return true;
1408 * In both of the below we check for strict and in that
1409 * case do not optimise the intersection of this list,
1410 * we must never return an entry not in this
1411 * list. This allows the index for
1412 * SCOPE_ONELEVEL to be trusted.
1415 /* the indexing code is allowed to return a longer list than
1416 what really matches, as all results are filtered by the
1417 full expression at the end - this shortcut avoids a lot of
1418 work in some cases */
1419 if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1420 return true;
1422 if (list2->count < 2 && list->count > 10 && list->strict == false) {
1423 list->count = list2->count;
1424 list->dn = list2->dn;
1425 /* note that list2 may not be the parent of list2->dn,
1426 as list2->dn may be owned by ltdb->idxptr. In that
1427 case we expect this reparent call to fail, which is
1428 OK */
1429 talloc_reparent(list2, list, list2->dn);
1430 return true;
1433 if (list->count > list2->count) {
1434 short_list = list2;
1435 long_list = list;
1436 } else {
1437 short_list = list;
1438 long_list = list2;
1441 list3 = talloc_zero(list, struct dn_list);
1442 if (list3 == NULL) {
1443 return false;
1446 list3->dn = talloc_array(list3, struct ldb_val,
1447 MIN(list->count, list2->count));
1448 if (!list3->dn) {
1449 talloc_free(list3);
1450 return false;
1452 list3->count = 0;
1454 for (i=0;i<short_list->count;i++) {
1455 /* For the GUID index case, this is a binary search */
1456 if (ldb_kv_dn_list_find_val(
1457 ldb_kv, long_list, &short_list->dn[i]) != -1) {
1458 list3->dn[list3->count] = short_list->dn[i];
1459 list3->count++;
1463 list->strict |= list2->strict;
1464 list->dn = talloc_steal(list, list3->dn);
1465 list->count = list3->count;
1466 talloc_free(list3);
1468 return true;
1473 list union
1474 list = list | list2
1476 static bool list_union(struct ldb_context *ldb,
1477 struct ldb_kv_private *ldb_kv,
1478 struct dn_list *list,
1479 struct dn_list *list2)
1481 struct ldb_val *dn3;
1482 unsigned int i = 0, j = 0, k = 0;
1484 if (list2->count == 0) {
1485 /* X | 0 == X */
1486 return true;
1489 if (list->count == 0) {
1490 /* 0 | X == X */
1491 list->count = list2->count;
1492 list->dn = list2->dn;
1493 /* note that list2 may not be the parent of list2->dn,
1494 as list2->dn may be owned by ltdb->idxptr. In that
1495 case we expect this reparent call to fail, which is
1496 OK */
1497 talloc_reparent(list2, list, list2->dn);
1498 return true;
1502 * Sort the lists (if not in GUID DN mode) so we can do
1503 * the de-duplication during the merge
1505 * NOTE: This can sort the in-memory index values, as list or
1506 * list2 might not be a copy!
1508 ldb_kv_dn_list_sort(ldb_kv, list);
1509 ldb_kv_dn_list_sort(ldb_kv, list2);
1511 dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1512 if (!dn3) {
1513 ldb_oom(ldb);
1514 return false;
1517 while (i < list->count || j < list2->count) {
1518 int cmp;
1519 if (i >= list->count) {
1520 cmp = 1;
1521 } else if (j >= list2->count) {
1522 cmp = -1;
1523 } else {
1524 cmp = ldb_val_equal_exact_ordered(list->dn[i],
1525 &list2->dn[j]);
1528 if (cmp < 0) {
1529 /* Take list */
1530 dn3[k] = list->dn[i];
1531 i++;
1532 k++;
1533 } else if (cmp > 0) {
1534 /* Take list2 */
1535 dn3[k] = list2->dn[j];
1536 j++;
1537 k++;
1538 } else {
1539 /* Equal, take list */
1540 dn3[k] = list->dn[i];
1541 i++;
1542 j++;
1543 k++;
1547 list->dn = dn3;
1548 list->count = k;
1550 return true;
1553 static int ldb_kv_index_dn(struct ldb_module *module,
1554 struct ldb_kv_private *ldb_kv,
1555 const struct ldb_parse_tree *tree,
1556 struct dn_list *list);
1559 process an OR list (a union)
1561 static int ldb_kv_index_dn_or(struct ldb_module *module,
1562 struct ldb_kv_private *ldb_kv,
1563 const struct ldb_parse_tree *tree,
1564 struct dn_list *list)
1566 struct ldb_context *ldb;
1567 unsigned int i;
1569 ldb = ldb_module_get_ctx(module);
1571 list->dn = NULL;
1572 list->count = 0;
1574 for (i=0; i<tree->u.list.num_elements; i++) {
1575 struct dn_list *list2;
1576 int ret;
1578 list2 = talloc_zero(list, struct dn_list);
1579 if (list2 == NULL) {
1580 return LDB_ERR_OPERATIONS_ERROR;
1583 ret = ldb_kv_index_dn(
1584 module, ldb_kv, tree->u.list.elements[i], list2);
1586 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1587 /* X || 0 == X */
1588 talloc_free(list2);
1589 continue;
1592 if (ret != LDB_SUCCESS) {
1593 /* X || * == * */
1594 talloc_free(list2);
1595 return ret;
1598 if (!list_union(ldb, ldb_kv, list, list2)) {
1599 talloc_free(list2);
1600 return LDB_ERR_OPERATIONS_ERROR;
1604 if (list->count == 0) {
1605 return LDB_ERR_NO_SUCH_OBJECT;
1608 return LDB_SUCCESS;
1613 NOT an index results
1615 static int ldb_kv_index_dn_not(_UNUSED_ struct ldb_module *module,
1616 _UNUSED_ struct ldb_kv_private *ldb_kv,
1617 _UNUSED_ const struct ldb_parse_tree *tree,
1618 _UNUSED_ struct dn_list *list)
1620 /* the only way to do an indexed not would be if we could
1621 negate the not via another not or if we knew the total
1622 number of database elements so we could know that the
1623 existing expression covered the whole database.
1625 instead, we just give up, and rely on a full index scan
1626 (unless an outer & manages to reduce the list)
1628 return LDB_ERR_OPERATIONS_ERROR;
1632 * These things are unique, so avoid a full scan if this is a search
1633 * by GUID, DN or a unique attribute
1635 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1636 struct ldb_kv_private *ldb_kv,
1637 const char *attr)
1639 const struct ldb_schema_attribute *a;
1640 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1641 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1642 0) {
1643 return true;
1646 if (ldb_attr_dn(attr) == 0) {
1647 return true;
1650 a = ldb_schema_attribute_by_name(ldb, attr);
1651 if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1652 return true;
1654 return false;
1658 process an AND expression (intersection)
1660 static int ldb_kv_index_dn_and(struct ldb_module *module,
1661 struct ldb_kv_private *ldb_kv,
1662 const struct ldb_parse_tree *tree,
1663 struct dn_list *list)
1665 struct ldb_context *ldb;
1666 unsigned int i;
1667 bool found;
1669 ldb = ldb_module_get_ctx(module);
1671 list->dn = NULL;
1672 list->count = 0;
1674 /* in the first pass we only look for unique simple
1675 equality tests, in the hope of avoiding having to look
1676 at any others */
1677 for (i=0; i<tree->u.list.num_elements; i++) {
1678 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1679 int ret;
1681 if (subtree->operation != LDB_OP_EQUALITY ||
1682 !ldb_kv_index_unique(
1683 ldb, ldb_kv, subtree->u.equality.attr)) {
1684 continue;
1687 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1688 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1689 /* 0 && X == 0 */
1690 return LDB_ERR_NO_SUCH_OBJECT;
1692 if (ret == LDB_SUCCESS) {
1693 /* a unique index match means we can
1694 * stop. Note that we don't care if we return
1695 * a few too many objects, due to later
1696 * filtering */
1697 return LDB_SUCCESS;
1701 /* now do a full intersection */
1702 found = false;
1704 for (i=0; i<tree->u.list.num_elements; i++) {
1705 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1706 struct dn_list *list2;
1707 int ret;
1709 list2 = talloc_zero(list, struct dn_list);
1710 if (list2 == NULL) {
1711 return ldb_module_oom(module);
1714 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1716 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1717 /* X && 0 == 0 */
1718 list->dn = NULL;
1719 list->count = 0;
1720 talloc_free(list2);
1721 return LDB_ERR_NO_SUCH_OBJECT;
1724 if (ret != LDB_SUCCESS) {
1725 /* this didn't adding anything */
1726 talloc_free(list2);
1727 continue;
1730 if (!found) {
1731 talloc_reparent(list2, list, list->dn);
1732 list->dn = list2->dn;
1733 list->count = list2->count;
1734 found = true;
1735 } else if (!list_intersect(ldb_kv, list, list2)) {
1736 talloc_free(list2);
1737 return LDB_ERR_OPERATIONS_ERROR;
1740 if (list->count == 0) {
1741 list->dn = NULL;
1742 return LDB_ERR_NO_SUCH_OBJECT;
1745 if (list->count < 2) {
1746 /* it isn't worth loading the next part of the tree */
1747 return LDB_SUCCESS;
1751 if (!found) {
1752 /* none of the attributes were indexed */
1753 return LDB_ERR_OPERATIONS_ERROR;
1756 return LDB_SUCCESS;
1759 struct ldb_kv_ordered_index_context {
1760 struct ldb_module *module;
1761 int error;
1762 struct dn_list *dn_list;
1765 static int traverse_range_index(_UNUSED_ struct ldb_kv_private *ldb_kv,
1766 _UNUSED_ struct ldb_val key,
1767 struct ldb_val data,
1768 void *state)
1771 struct ldb_context *ldb;
1772 struct ldb_kv_ordered_index_context *ctx =
1773 (struct ldb_kv_ordered_index_context *)state;
1774 struct ldb_module *module = ctx->module;
1775 struct ldb_message_element *el = NULL;
1776 struct ldb_message *msg = NULL;
1777 int version;
1778 size_t dn_array_size, additional_length;
1779 unsigned int i;
1781 ldb = ldb_module_get_ctx(module);
1783 msg = ldb_msg_new(module);
1785 ctx->error = ldb_unpack_data_flags(ldb, &data, msg,
1786 LDB_UNPACK_DATA_FLAG_NO_DN);
1788 if (ctx->error != LDB_SUCCESS) {
1789 talloc_free(msg);
1790 return ctx->error;
1793 el = ldb_msg_find_element(msg, LDB_KV_IDX);
1794 if (!el) {
1795 talloc_free(msg);
1796 return LDB_SUCCESS;
1799 version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
1802 * we avoid copying the strings by stealing the list. We have
1803 * to steal msg onto el->values (which looks odd) because
1804 * the memory is allocated on msg, not on each value.
1806 if (version != LDB_KV_GUID_INDEXING_VERSION) {
1807 /* This is quite likely during the DB startup
1808 on first upgrade to using a GUID index */
1809 ldb_debug_set(ldb_module_get_ctx(module),
1810 LDB_DEBUG_ERROR, __location__
1811 ": Wrong GUID index version %d expected %d",
1812 version, LDB_KV_GUID_INDEXING_VERSION);
1813 talloc_free(msg);
1814 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1815 return ctx->error;
1818 if (el->num_values == 0) {
1819 talloc_free(msg);
1820 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1821 return ctx->error;
1824 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
1825 || el->values[0].length == 0) {
1826 talloc_free(msg);
1827 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1828 return ctx->error;
1831 dn_array_size = talloc_array_length(ctx->dn_list->dn);
1833 additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
1835 if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
1836 talloc_free(msg);
1837 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1838 return ctx->error;
1841 if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
1842 size_t new_array_length;
1844 if (dn_array_size * 2 < dn_array_size) {
1845 talloc_free(msg);
1846 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1847 return ctx->error;
1850 new_array_length = MAX(ctx->dn_list->count + additional_length,
1851 dn_array_size * 2);
1853 ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
1854 ctx->dn_list->dn,
1855 struct ldb_val,
1856 new_array_length);
1859 if (ctx->dn_list->dn == NULL) {
1860 talloc_free(msg);
1861 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1862 return ctx->error;
1866 * The actual data is on msg.
1868 talloc_steal(ctx->dn_list->dn, msg);
1869 for (i = 0; i < additional_length; i++) {
1870 ctx->dn_list->dn[i + ctx->dn_list->count].data
1871 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
1872 ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
1876 ctx->dn_list->count += additional_length;
1878 talloc_free(msg->elements);
1880 return LDB_SUCCESS;
1884 * >= and <= indexing implemented using lexicographically sorted keys
1886 * We only run this in GUID indexing mode and when there is no write
1887 * transaction (only implicit read locks are being held). Otherwise, we would
1888 * have to deal with the in-memory index cache.
1890 * We rely on the implementation of index_format_fn on a schema syntax which
1891 * will can help us to construct keys which can be ordered correctly, and we
1892 * terminate using schema agnostic start and end keys.
1894 * index_format_fn must output values which can be memcmp-able to produce the
1895 * correct ordering as defined by the schema syntax class.
1897 static int ldb_kv_index_dn_ordered(struct ldb_module *module,
1898 struct ldb_kv_private *ldb_kv,
1899 const struct ldb_parse_tree *tree,
1900 struct dn_list *list, bool ascending)
1902 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1903 struct ldb_context *ldb = ldb_module_get_ctx(module);
1905 struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
1906 struct ldb_val start_key, end_key;
1907 struct ldb_dn *key_dn = NULL;
1908 const struct ldb_schema_attribute *a = NULL;
1910 struct ldb_kv_ordered_index_context ctx;
1911 int ret;
1913 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1915 if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
1916 return LDB_ERR_OPERATIONS_ERROR;
1919 if (ldb_kv->cache->GUID_index_attribute == NULL) {
1920 return LDB_ERR_OPERATIONS_ERROR;
1923 /* bail out if we're in a transaction, full search instead. */
1924 if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1925 return LDB_ERR_OPERATIONS_ERROR;
1928 if (ldb_kv->disallow_dn_filter &&
1929 (ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
1930 /* in AD mode we do not support "(dn=...)" search filters */
1931 list->dn = NULL;
1932 list->count = 0;
1933 return LDB_SUCCESS;
1935 if (tree->u.comparison.attr[0] == '@') {
1936 /* Do not allow a indexed search against an @ */
1937 list->dn = NULL;
1938 list->count = 0;
1939 return LDB_SUCCESS;
1942 a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
1945 * If there's no index format function defined for this attr, then
1946 * the lexicographic order in the database doesn't correspond to the
1947 * attr's ordering, so we can't use the iterate_range op.
1949 if (a->syntax->index_format_fn == NULL) {
1950 return LDB_ERR_OPERATIONS_ERROR;
1953 key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1954 &tree->u.comparison.value,
1955 NULL, &truncation);
1956 if (!key_dn) {
1957 return LDB_ERR_OPERATIONS_ERROR;
1958 } else if (truncation == KEY_TRUNCATED) {
1959 ldb_debug(ldb, LDB_DEBUG_WARNING,
1960 __location__
1961 ": ordered index violation: key dn truncated: %s\n",
1962 ldb_dn_get_linearized(key_dn));
1963 return LDB_ERR_OPERATIONS_ERROR;
1965 ldb_key = ldb_kv_key_dn(tmp_ctx, key_dn);
1966 talloc_free(key_dn);
1967 if (ldb_key.data == NULL) {
1968 return LDB_ERR_OPERATIONS_ERROR;
1971 key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1972 NULL, NULL, &truncation);
1973 if (!key_dn) {
1974 return LDB_ERR_OPERATIONS_ERROR;
1975 } else if (truncation == KEY_TRUNCATED) {
1976 ldb_debug(ldb, LDB_DEBUG_WARNING,
1977 __location__
1978 ": ordered index violation: key dn truncated: %s\n",
1979 ldb_dn_get_linearized(key_dn));
1980 return LDB_ERR_OPERATIONS_ERROR;
1983 ldb_key2 = ldb_kv_key_dn(tmp_ctx, key_dn);
1984 talloc_free(key_dn);
1985 if (ldb_key2.data == NULL) {
1986 return LDB_ERR_OPERATIONS_ERROR;
1990 * In order to avoid defining a start and end key for the search, we
1991 * notice that each index key is of the form:
1993 * DN=@INDEX:<ATTRIBUTE>:<VALUE>\0.
1995 * We can simply make our start key DN=@INDEX:<ATTRIBUTE>: and our end
1996 * key DN=@INDEX:<ATTRIBUTE>; to return all index entries for a
1997 * particular attribute.
1999 * Our LMDB backend uses the default memcmp for key comparison.
2002 /* Eliminate NUL byte at the end of the empty key */
2003 ldb_key2.length--;
2005 if (ascending) {
2006 /* : becomes ; for pseudo end-key */
2007 ldb_key2.data[ldb_key2.length-1]++;
2008 start_key = ldb_key;
2009 end_key = ldb_key2;
2010 } else {
2011 start_key = ldb_key2;
2012 end_key = ldb_key;
2015 ctx.module = module;
2016 ctx.error = 0;
2017 ctx.dn_list = list;
2018 ctx.dn_list->count = 0;
2019 ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
2021 ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
2022 traverse_range_index, &ctx);
2024 if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
2025 return LDB_ERR_OPERATIONS_ERROR;
2028 TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
2029 ldb_val_equal_exact_for_qsort);
2031 talloc_free(tmp_ctx);
2033 return LDB_SUCCESS;
2036 static int ldb_kv_index_dn_greater(struct ldb_module *module,
2037 struct ldb_kv_private *ldb_kv,
2038 const struct ldb_parse_tree *tree,
2039 struct dn_list *list)
2041 return ldb_kv_index_dn_ordered(module,
2042 ldb_kv,
2043 tree,
2044 list, true);
2047 static int ldb_kv_index_dn_less(struct ldb_module *module,
2048 struct ldb_kv_private *ldb_kv,
2049 const struct ldb_parse_tree *tree,
2050 struct dn_list *list)
2052 return ldb_kv_index_dn_ordered(module,
2053 ldb_kv,
2054 tree,
2055 list, false);
2059 return a list of matching objects using a one-level index
2061 static int ldb_kv_index_dn_attr(struct ldb_module *module,
2062 struct ldb_kv_private *ldb_kv,
2063 const char *attr,
2064 struct ldb_dn *dn,
2065 struct dn_list *list,
2066 enum key_truncation *truncation)
2068 struct ldb_context *ldb;
2069 struct ldb_dn *key;
2070 struct ldb_val val;
2071 int ret;
2073 ldb = ldb_module_get_ctx(module);
2075 /* work out the index key from the parent DN */
2076 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2077 if (val.data == NULL) {
2078 const char *dn_str = ldb_dn_get_linearized(dn);
2079 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2080 __location__
2081 ": Failed to get casefold DN "
2082 "from: %s",
2083 dn_str);
2084 return LDB_ERR_OPERATIONS_ERROR;
2086 val.length = strlen((char *)val.data);
2087 key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
2088 if (!key) {
2089 ldb_oom(ldb);
2090 return LDB_ERR_OPERATIONS_ERROR;
2093 ret = ldb_kv_dn_list_load(module, ldb_kv, key, list,
2094 DN_LIST_WILL_BE_READ_ONLY);
2095 talloc_free(key);
2096 if (ret != LDB_SUCCESS) {
2097 return ret;
2100 if (list->count == 0) {
2101 return LDB_ERR_NO_SUCH_OBJECT;
2104 return LDB_SUCCESS;
2108 return a list of matching objects using a one-level index
2110 static int ldb_kv_index_dn_one(struct ldb_module *module,
2111 struct ldb_kv_private *ldb_kv,
2112 struct ldb_dn *parent_dn,
2113 struct dn_list *list,
2114 enum key_truncation *truncation)
2117 * Ensure we do not shortcut on intersection for this list.
2118 * We must never be lazy and return an entry not in this
2119 * list. This allows the index for
2120 * SCOPE_ONELEVEL to be trusted.
2123 list->strict = true;
2124 return ldb_kv_index_dn_attr(
2125 module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
2129 return a list of matching objects using the DN index
2131 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
2132 struct ldb_kv_private *ldb_kv,
2133 struct ldb_dn *base_dn,
2134 struct dn_list *dn_list,
2135 enum key_truncation *truncation)
2137 const struct ldb_val *guid_val = NULL;
2138 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2139 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2140 if (dn_list->dn == NULL) {
2141 return ldb_module_oom(module);
2143 dn_list->dn[0].data = discard_const_p(unsigned char,
2144 ldb_dn_get_linearized(base_dn));
2145 if (dn_list->dn[0].data == NULL) {
2146 talloc_free(dn_list->dn);
2147 return ldb_module_oom(module);
2149 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
2150 dn_list->count = 1;
2152 return LDB_SUCCESS;
2155 if (ldb_kv->cache->GUID_index_dn_component != NULL) {
2156 guid_val = ldb_dn_get_extended_component(
2157 base_dn, ldb_kv->cache->GUID_index_dn_component);
2160 if (guid_val != NULL) {
2161 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2162 if (dn_list->dn == NULL) {
2163 return ldb_module_oom(module);
2165 dn_list->dn[0].data = guid_val->data;
2166 dn_list->dn[0].length = guid_val->length;
2167 dn_list->count = 1;
2169 return LDB_SUCCESS;
2172 return ldb_kv_index_dn_attr(
2173 module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
2177 return a list of dn's that might match a indexed search or
2178 an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
2180 static int ldb_kv_index_dn(struct ldb_module *module,
2181 struct ldb_kv_private *ldb_kv,
2182 const struct ldb_parse_tree *tree,
2183 struct dn_list *list)
2185 int ret = LDB_ERR_OPERATIONS_ERROR;
2187 switch (tree->operation) {
2188 case LDB_OP_AND:
2189 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
2190 break;
2192 case LDB_OP_OR:
2193 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
2194 break;
2196 case LDB_OP_NOT:
2197 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
2198 break;
2200 case LDB_OP_EQUALITY:
2201 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
2202 break;
2204 case LDB_OP_GREATER:
2205 ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
2206 break;
2208 case LDB_OP_LESS:
2209 ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
2210 break;
2212 case LDB_OP_SUBSTRING:
2213 case LDB_OP_PRESENT:
2214 case LDB_OP_APPROX:
2215 case LDB_OP_EXTENDED:
2216 /* we can't index with fancy bitops yet */
2217 ret = LDB_ERR_OPERATIONS_ERROR;
2218 break;
2221 return ret;
2225 filter a candidate dn_list from an indexed search into a set of results
2226 extracting just the given attributes
2228 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
2229 const struct dn_list *dn_list,
2230 struct ldb_kv_context *ac,
2231 uint32_t *match_count,
2232 enum key_truncation scope_one_truncation)
2234 struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2235 struct ldb_message *msg;
2236 struct ldb_message *filtered_msg;
2237 unsigned int i;
2238 unsigned int num_keys = 0;
2239 uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
2240 struct ldb_val *keys = NULL;
2243 * We have to allocate the key list (rather than just walk the
2244 * caller supplied list) as the callback could change the list
2245 * (by modifying an indexed attribute hosted in the in-memory
2246 * index cache!)
2248 keys = talloc_array(ac, struct ldb_val, dn_list->count);
2249 if (keys == NULL) {
2250 return ldb_module_oom(ac->module);
2253 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2255 * We speculate that the keys will be GUID based and so
2256 * pre-fill in enough space for a GUID (avoiding a pile of
2257 * small allocations)
2259 struct guid_tdb_key {
2260 uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2261 } *key_values = NULL;
2263 key_values = talloc_array(keys,
2264 struct guid_tdb_key,
2265 dn_list->count);
2267 if (key_values == NULL) {
2268 talloc_free(keys);
2269 return ldb_module_oom(ac->module);
2271 for (i = 0; i < dn_list->count; i++) {
2272 keys[i].data = key_values[i].guid_key;
2273 keys[i].length = sizeof(key_values[i].guid_key);
2275 } else {
2276 for (i = 0; i < dn_list->count; i++) {
2277 keys[i].data = NULL;
2278 keys[i].length = 0;
2282 for (i = 0; i < dn_list->count; i++) {
2283 int ret;
2285 ret = ldb_kv_idx_to_key(
2286 ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
2287 if (ret != LDB_SUCCESS) {
2288 talloc_free(keys);
2289 return ret;
2292 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2294 * If we are in GUID index mode, then the dn_list is
2295 * sorted. If we got a duplicate, forget about it, as
2296 * otherwise we would send the same entry back more
2297 * than once.
2299 * This is needed in the truncated DN case, or if a
2300 * duplicate was forced in via
2301 * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2304 if (memcmp(previous_guid_key,
2305 keys[num_keys].data,
2306 sizeof(previous_guid_key)) == 0) {
2307 continue;
2310 memcpy(previous_guid_key,
2311 keys[num_keys].data,
2312 sizeof(previous_guid_key));
2314 num_keys++;
2319 * Now that the list is a safe copy, send the callbacks
2321 for (i = 0; i < num_keys; i++) {
2322 int ret;
2323 bool matched;
2324 msg = ldb_msg_new(ac);
2325 if (!msg) {
2326 talloc_free(keys);
2327 return LDB_ERR_OPERATIONS_ERROR;
2330 ret =
2331 ldb_kv_search_key(ac->module,
2332 ldb_kv,
2333 keys[i],
2334 msg,
2335 LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC |
2337 * The entry point ldb_kv_search_indexed is
2338 * only called from the read-locked
2339 * ldb_kv_search.
2341 LDB_UNPACK_DATA_FLAG_READ_LOCKED);
2342 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2344 * the record has disappeared? yes, this can
2345 * happen if the entry is deleted by something
2346 * operating in the callback (not another
2347 * process, as we have a read lock)
2349 talloc_free(msg);
2350 continue;
2353 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2354 /* an internal error */
2355 talloc_free(keys);
2356 talloc_free(msg);
2357 return LDB_ERR_OPERATIONS_ERROR;
2361 * We trust the index for LDB_SCOPE_ONELEVEL
2362 * unless the index key has been truncated.
2364 * LDB_SCOPE_BASE is not passed in by our only caller.
2366 if (ac->scope == LDB_SCOPE_ONELEVEL &&
2367 ldb_kv->cache->one_level_indexes &&
2368 scope_one_truncation == KEY_NOT_TRUNCATED) {
2369 ret = ldb_match_message(ldb, msg, ac->tree,
2370 ac->scope, &matched);
2371 } else {
2372 ret = ldb_match_msg_error(ldb, msg,
2373 ac->tree, ac->base,
2374 ac->scope, &matched);
2377 if (ret != LDB_SUCCESS) {
2378 talloc_free(keys);
2379 talloc_free(msg);
2380 return ret;
2382 if (!matched) {
2383 talloc_free(msg);
2384 continue;
2387 filtered_msg = ldb_msg_new(ac);
2388 if (filtered_msg == NULL) {
2389 TALLOC_FREE(keys);
2390 TALLOC_FREE(msg);
2391 return LDB_ERR_OPERATIONS_ERROR;
2394 filtered_msg->dn = talloc_steal(filtered_msg, msg->dn);
2396 /* filter the attributes that the user wants */
2397 ret = ldb_kv_filter_attrs(ldb, msg, ac->attrs, filtered_msg);
2399 talloc_free(msg);
2401 if (ret == -1) {
2402 TALLOC_FREE(filtered_msg);
2403 talloc_free(keys);
2404 return LDB_ERR_OPERATIONS_ERROR;
2407 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
2408 if (ret != LDB_SUCCESS) {
2409 /* Regardless of success or failure, the msg
2410 * is the callbacks responsiblity, and should
2411 * not be talloc_free()'ed */
2412 ac->request_terminated = true;
2413 talloc_free(keys);
2414 return ret;
2417 (*match_count)++;
2420 TALLOC_FREE(keys);
2421 return LDB_SUCCESS;
2425 sort a DN list
2427 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
2428 struct dn_list *list)
2430 if (list->count < 2) {
2431 return;
2434 /* We know the list is sorted when using the GUID index */
2435 if (ltdb->cache->GUID_index_attribute != NULL) {
2436 return;
2439 TYPESAFE_QSORT(list->dn, list->count,
2440 ldb_val_equal_exact_for_qsort);
2444 search the database with a LDAP-like expression using indexes
2445 returns -1 if an indexed search is not possible, in which
2446 case the caller should call ltdb_search_full()
2448 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
2450 struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2451 struct ldb_kv_private *ldb_kv = talloc_get_type(
2452 ldb_module_get_private(ac->module), struct ldb_kv_private);
2453 struct dn_list *dn_list;
2454 int ret;
2455 enum ldb_scope index_scope;
2456 enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
2458 /* see if indexing is enabled */
2459 if (!ldb_kv->cache->attribute_indexes &&
2460 !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
2461 /* fallback to a full search */
2462 return LDB_ERR_OPERATIONS_ERROR;
2465 dn_list = talloc_zero(ac, struct dn_list);
2466 if (dn_list == NULL) {
2467 return ldb_module_oom(ac->module);
2471 * For the purposes of selecting the switch arm below, if we
2472 * don't have a one-level index then treat it like a subtree
2473 * search
2475 if (ac->scope == LDB_SCOPE_ONELEVEL &&
2476 !ldb_kv->cache->one_level_indexes) {
2477 index_scope = LDB_SCOPE_SUBTREE;
2478 } else {
2479 index_scope = ac->scope;
2482 switch (index_scope) {
2483 case LDB_SCOPE_BASE:
2485 * The only caller will have filtered the operation out
2486 * so we should never get here
2488 return ldb_operr(ldb);
2490 case LDB_SCOPE_ONELEVEL:
2493 * First, load all the one-level child objects (regardless of
2494 * whether they match the search filter or not). The database
2495 * maintains a one-level index, so retrieving this is quick.
2497 ret = ldb_kv_index_dn_one(ac->module,
2498 ldb_kv,
2499 ac->base,
2500 dn_list,
2501 &scope_one_truncation);
2502 if (ret != LDB_SUCCESS) {
2503 talloc_free(dn_list);
2504 return ret;
2508 * If we have too many children, running ldb_kv_index_filter()
2509 * over all the child objects can be quite expensive. So next
2510 * we do a separate indexed query using the search filter.
2512 * This should be quick, but it may return objects that are not
2513 * the direct one-level child objects we're interested in.
2515 * We only do this in the GUID index mode, which is
2516 * O(n*log(m)) otherwise the intersection below will
2517 * be too costly at O(n*m).
2519 * We don't set a heuristic for 'too many' but instead
2520 * do it always and rely on the index lookup being
2521 * fast enough in the small case.
2523 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2524 struct dn_list *indexed_search_result
2525 = talloc_zero(ac, struct dn_list);
2526 if (indexed_search_result == NULL) {
2527 talloc_free(dn_list);
2528 return ldb_module_oom(ac->module);
2531 if (!ldb_kv->cache->attribute_indexes) {
2532 talloc_free(indexed_search_result);
2533 talloc_free(dn_list);
2534 return LDB_ERR_OPERATIONS_ERROR;
2538 * Try to do an indexed database search
2540 ret = ldb_kv_index_dn(
2541 ac->module, ldb_kv, ac->tree,
2542 indexed_search_result);
2545 * We can stop if we're sure the object doesn't exist
2547 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2548 talloc_free(indexed_search_result);
2549 talloc_free(dn_list);
2550 return LDB_ERR_NO_SUCH_OBJECT;
2554 * Once we have a successful search result, we
2555 * intersect it with the one-level children (dn_list).
2556 * This should give us exactly the result we're after
2557 * (we still need to run ldb_kv_index_filter() to
2558 * handle potential index truncation cases).
2560 * The indexed search may fail because we don't support
2561 * indexing on that type of search operation, e.g.
2562 * matching against '*'. In which case we fall through
2563 * and run ldb_kv_index_filter() over all the one-level
2564 * children (which is still better than bailing out here
2565 * and falling back to a full DB scan).
2567 if (ret == LDB_SUCCESS) {
2568 if (!list_intersect(ldb_kv,
2569 dn_list,
2570 indexed_search_result)) {
2571 talloc_free(indexed_search_result);
2572 talloc_free(dn_list);
2573 return LDB_ERR_OPERATIONS_ERROR;
2577 break;
2579 case LDB_SCOPE_SUBTREE:
2580 case LDB_SCOPE_DEFAULT:
2581 if (!ldb_kv->cache->attribute_indexes) {
2582 talloc_free(dn_list);
2583 return LDB_ERR_OPERATIONS_ERROR;
2586 * Here we load the index for the tree. We have no
2587 * index for the subtree.
2589 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2590 if (ret != LDB_SUCCESS) {
2591 talloc_free(dn_list);
2592 return ret;
2594 break;
2598 * It is critical that this function do the re-filter even
2599 * on things found by the index as the index can over-match
2600 * in cases of truncation (as well as when it decides it is
2601 * not worth further filtering)
2603 * If this changes, then the index code above would need to
2604 * pass up a flag to say if any index was truncated during
2605 * processing as the truncation here refers only to the
2606 * SCOPE_ONELEVEL index.
2608 ret = ldb_kv_index_filter(
2609 ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2610 talloc_free(dn_list);
2611 return ret;
2615 * @brief Add a DN in the index list of a given attribute name/value pair
2617 * This function will add the DN in the index list for the index for
2618 * the given attribute name and value.
2620 * @param[in] module A ldb_module structure
2622 * @param[in] dn The string representation of the DN as it
2623 * will be stored in the index entry
2625 * @param[in] el A ldb_message_element array, one of the entry
2626 * referred by the v_idx is the attribute name and
2627 * value pair which will be used to construct the
2628 * index name
2630 * @param[in] v_idx The index of element in the el array to use
2632 * @return An ldb error code
2634 static int ldb_kv_index_add1(struct ldb_module *module,
2635 struct ldb_kv_private *ldb_kv,
2636 const struct ldb_message *msg,
2637 struct ldb_message_element *el,
2638 int v_idx)
2640 struct ldb_context *ldb;
2641 struct ldb_dn *dn_key;
2642 int ret;
2643 const struct ldb_schema_attribute *a;
2644 struct dn_list *list;
2645 unsigned alloc_len;
2646 enum key_truncation truncation = KEY_TRUNCATED;
2649 ldb = ldb_module_get_ctx(module);
2651 list = talloc_zero(module, struct dn_list);
2652 if (list == NULL) {
2653 return LDB_ERR_OPERATIONS_ERROR;
2656 dn_key = ldb_kv_index_key(
2657 ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2658 if (!dn_key) {
2659 talloc_free(list);
2660 return LDB_ERR_OPERATIONS_ERROR;
2663 * Samba only maintains unique indexes on the objectSID and objectGUID
2664 * so if a unique index key exceeds the maximum length there is a
2665 * problem.
2667 if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2668 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2669 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2671 ldb_asprintf_errstring(
2672 ldb,
2673 __location__ ": unique index key on %s in %s, "
2674 "exceeds maximum key length of %u (encoded).",
2675 el->name,
2676 ldb_dn_get_linearized(msg->dn),
2677 ldb_kv->max_key_length);
2678 talloc_free(list);
2679 return LDB_ERR_CONSTRAINT_VIOLATION;
2681 talloc_steal(list, dn_key);
2683 ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
2684 DN_LIST_MUTABLE);
2685 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2686 talloc_free(list);
2687 return ret;
2691 * Check for duplicates in the @IDXDN DN -> GUID record
2693 * This is very normal, it just means a duplicate DN creation
2694 * was attempted, so don't set the error string or print scary
2695 * messages.
2697 if (list->count > 0 &&
2698 ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2699 truncation == KEY_NOT_TRUNCATED) {
2701 talloc_free(list);
2702 return LDB_ERR_CONSTRAINT_VIOLATION;
2704 } else if (list->count > 0
2705 && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2708 * At least one existing entry in the DN->GUID index, which
2709 * arises when the DN indexes have been truncated
2711 * So need to pull the DN's to check if it's really a duplicate
2713 unsigned int i;
2714 for (i=0; i < list->count; i++) {
2715 uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2716 struct ldb_val key = {
2717 .data = guid_key,
2718 .length = sizeof(guid_key)
2720 const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2721 struct ldb_message *rec = ldb_msg_new(ldb);
2722 if (rec == NULL) {
2723 return LDB_ERR_OPERATIONS_ERROR;
2726 ret = ldb_kv_idx_to_key(
2727 module, ldb_kv, ldb, &list->dn[i], &key);
2728 if (ret != LDB_SUCCESS) {
2729 TALLOC_FREE(list);
2730 TALLOC_FREE(rec);
2731 return ret;
2734 ret =
2735 ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2736 if (key.data != guid_key) {
2737 TALLOC_FREE(key.data);
2739 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2741 * the record has disappeared?
2742 * yes, this can happen
2744 talloc_free(rec);
2745 continue;
2748 if (ret != LDB_SUCCESS) {
2749 /* an internal error */
2750 TALLOC_FREE(rec);
2751 TALLOC_FREE(list);
2752 return LDB_ERR_OPERATIONS_ERROR;
2755 * The DN we are trying to add to the DB and index
2756 * is already here, so we must deny the addition
2758 if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2759 TALLOC_FREE(rec);
2760 TALLOC_FREE(list);
2761 return LDB_ERR_CONSTRAINT_VIOLATION;
2767 * Check for duplicates in unique indexes
2769 * We don't need to do a loop test like the @IDXDN case
2770 * above as we have a ban on long unique index values
2771 * at the start of this function.
2773 if (list->count > 0 &&
2774 ((a != NULL
2775 && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2776 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2778 * We do not want to print info about a possibly
2779 * confidential DN that the conflict was with in the
2780 * user-visible error string
2783 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2784 ldb_debug(ldb, LDB_DEBUG_WARNING,
2785 __location__
2786 ": unique index violation on %s in %s, "
2787 "conflicts with %*.*s in %s",
2788 el->name, ldb_dn_get_linearized(msg->dn),
2789 (int)list->dn[0].length,
2790 (int)list->dn[0].length,
2791 list->dn[0].data,
2792 ldb_dn_get_linearized(dn_key));
2793 } else {
2794 /* This can't fail, gives a default at worst */
2795 const struct ldb_schema_attribute *attr =
2796 ldb_schema_attribute_by_name(
2797 ldb, ldb_kv->cache->GUID_index_attribute);
2798 struct ldb_val v;
2799 ret = attr->syntax->ldif_write_fn(ldb, list,
2800 &list->dn[0], &v);
2801 if (ret == LDB_SUCCESS) {
2802 ldb_debug(ldb,
2803 LDB_DEBUG_WARNING,
2804 __location__
2805 ": unique index violation on %s in "
2806 "%s, conflicts with %s %*.*s in %s",
2807 el->name,
2808 ldb_dn_get_linearized(msg->dn),
2809 ldb_kv->cache->GUID_index_attribute,
2810 (int)v.length,
2811 (int)v.length,
2812 v.data,
2813 ldb_dn_get_linearized(dn_key));
2816 ldb_asprintf_errstring(ldb,
2817 __location__ ": unique index violation "
2818 "on %s in %s",
2819 el->name,
2820 ldb_dn_get_linearized(msg->dn));
2821 talloc_free(list);
2822 return LDB_ERR_CONSTRAINT_VIOLATION;
2825 /* overallocate the list a bit, to reduce the number of
2826 * realloc trigered copies */
2827 alloc_len = ((list->count+1)+7) & ~7;
2828 list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2829 if (list->dn == NULL) {
2830 talloc_free(list);
2831 return LDB_ERR_OPERATIONS_ERROR;
2834 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2835 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2836 list->dn[list->count].data
2837 = (uint8_t *)talloc_strdup(list->dn, dn_str);
2838 if (list->dn[list->count].data == NULL) {
2839 talloc_free(list);
2840 return LDB_ERR_OPERATIONS_ERROR;
2842 list->dn[list->count].length = strlen(dn_str);
2843 } else {
2844 const struct ldb_val *key_val;
2845 struct ldb_val *exact = NULL, *next = NULL;
2846 key_val = ldb_msg_find_ldb_val(
2847 msg, ldb_kv->cache->GUID_index_attribute);
2848 if (key_val == NULL) {
2849 talloc_free(list);
2850 return ldb_module_operr(module);
2853 if (key_val->length != LDB_KV_GUID_SIZE) {
2854 talloc_free(list);
2855 return ldb_module_operr(module);
2858 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2859 *key_val, ldb_val_equal_exact_ordered,
2860 exact, next);
2863 * Give a warning rather than fail, this could be a
2864 * duplicate value in the record allowed by a caller
2865 * forcing in the value with
2866 * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2868 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2869 /* This can't fail, gives a default at worst */
2870 const struct ldb_schema_attribute *attr =
2871 ldb_schema_attribute_by_name(
2872 ldb, ldb_kv->cache->GUID_index_attribute);
2873 struct ldb_val v;
2874 ret = attr->syntax->ldif_write_fn(ldb, list,
2875 exact, &v);
2876 if (ret == LDB_SUCCESS) {
2877 ldb_debug(ldb,
2878 LDB_DEBUG_WARNING,
2879 __location__
2880 ": duplicate attribute value in %s "
2881 "for index on %s, "
2882 "duplicate of %s %*.*s in %s",
2883 ldb_dn_get_linearized(msg->dn),
2884 el->name,
2885 ldb_kv->cache->GUID_index_attribute,
2886 (int)v.length,
2887 (int)v.length,
2888 v.data,
2889 ldb_dn_get_linearized(dn_key));
2893 if (next == NULL) {
2894 next = &list->dn[list->count];
2895 } else {
2896 memmove(&next[1], next,
2897 sizeof(*next) * (list->count - (next - list->dn)));
2899 *next = ldb_val_dup(list->dn, key_val);
2900 if (next->data == NULL) {
2901 talloc_free(list);
2902 return ldb_module_operr(module);
2905 list->count++;
2907 ret = ldb_kv_dn_list_store(module, dn_key, list);
2909 talloc_free(list);
2911 return ret;
2915 add index entries for one elements in a message
2917 static int ldb_kv_index_add_el(struct ldb_module *module,
2918 struct ldb_kv_private *ldb_kv,
2919 const struct ldb_message *msg,
2920 struct ldb_message_element *el)
2922 unsigned int i;
2923 for (i = 0; i < el->num_values; i++) {
2924 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2925 if (ret != LDB_SUCCESS) {
2926 return ret;
2930 return LDB_SUCCESS;
2934 add index entries for all elements in a message
2936 static int ldb_kv_index_add_all(struct ldb_module *module,
2937 struct ldb_kv_private *ldb_kv,
2938 const struct ldb_message *msg)
2940 struct ldb_message_element *elements = msg->elements;
2941 unsigned int i;
2942 const char *dn_str;
2943 int ret;
2945 if (ldb_dn_is_special(msg->dn)) {
2946 return LDB_SUCCESS;
2949 dn_str = ldb_dn_get_linearized(msg->dn);
2950 if (dn_str == NULL) {
2951 return LDB_ERR_OPERATIONS_ERROR;
2954 ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2955 if (ret != LDB_SUCCESS) {
2956 return ret;
2959 if (!ldb_kv->cache->attribute_indexes) {
2960 /* no indexed fields */
2961 return LDB_SUCCESS;
2964 for (i = 0; i < msg->num_elements; i++) {
2965 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2966 continue;
2968 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2969 if (ret != LDB_SUCCESS) {
2970 struct ldb_context *ldb = ldb_module_get_ctx(module);
2971 ldb_asprintf_errstring(ldb,
2972 __location__ ": Failed to re-index %s in %s - %s",
2973 elements[i].name, dn_str,
2974 ldb_errstring(ldb));
2975 return ret;
2979 return LDB_SUCCESS;
2984 insert a DN index for a message
2986 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2987 struct ldb_kv_private *ldb_kv,
2988 const struct ldb_message *msg,
2989 struct ldb_dn *dn,
2990 const char *index,
2991 int add)
2993 struct ldb_message_element el;
2994 struct ldb_val val;
2995 int ret;
2997 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2998 if (val.data == NULL) {
2999 const char *dn_str = ldb_dn_get_linearized(dn);
3000 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3001 __location__ ": Failed to modify %s "
3002 "against %s in %s: failed "
3003 "to get casefold DN",
3004 index,
3005 ldb_kv->cache->GUID_index_attribute,
3006 dn_str);
3007 return LDB_ERR_OPERATIONS_ERROR;
3010 val.length = strlen((char *)val.data);
3011 el.name = index;
3012 el.values = &val;
3013 el.num_values = 1;
3015 if (add) {
3016 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
3017 } else { /* delete */
3018 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
3021 if (ret != LDB_SUCCESS) {
3022 struct ldb_context *ldb = ldb_module_get_ctx(module);
3023 const char *dn_str = ldb_dn_get_linearized(dn);
3024 ldb_asprintf_errstring(ldb,
3025 __location__ ": Failed to modify %s "
3026 "against %s in %s - %s",
3027 index,
3028 ldb_kv->cache->GUID_index_attribute,
3029 dn_str,
3030 ldb_errstring(ldb));
3031 return ret;
3033 return ret;
3037 insert a one level index for a message
3039 static int ldb_kv_index_onelevel(struct ldb_module *module,
3040 const struct ldb_message *msg,
3041 int add)
3043 struct ldb_kv_private *ldb_kv = talloc_get_type(
3044 ldb_module_get_private(module), struct ldb_kv_private);
3045 struct ldb_dn *pdn;
3046 int ret;
3048 /* We index for ONE Level only if requested */
3049 if (!ldb_kv->cache->one_level_indexes) {
3050 return LDB_SUCCESS;
3053 pdn = ldb_dn_get_parent(module, msg->dn);
3054 if (pdn == NULL) {
3055 return LDB_ERR_OPERATIONS_ERROR;
3057 ret =
3058 ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
3060 talloc_free(pdn);
3062 return ret;
3066 insert a one level index for a message
3068 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
3069 const struct ldb_message *msg,
3070 int add)
3072 int ret;
3073 struct ldb_kv_private *ldb_kv = talloc_get_type(
3074 ldb_module_get_private(module), struct ldb_kv_private);
3076 /* We index for DN only if using a GUID index */
3077 if (ldb_kv->cache->GUID_index_attribute == NULL) {
3078 return LDB_SUCCESS;
3081 ret = ldb_kv_modify_index_dn(
3082 module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
3084 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
3085 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3086 "Entry %s already exists",
3087 ldb_dn_get_linearized(msg->dn));
3088 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
3090 return ret;
3094 add the index entries for a new element in a record
3095 The caller guarantees that these element values are not yet indexed
3097 int ldb_kv_index_add_element(struct ldb_module *module,
3098 struct ldb_kv_private *ldb_kv,
3099 const struct ldb_message *msg,
3100 struct ldb_message_element *el)
3102 if (ldb_dn_is_special(msg->dn)) {
3103 return LDB_SUCCESS;
3105 if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3106 return LDB_SUCCESS;
3108 return ldb_kv_index_add_el(module, ldb_kv, msg, el);
3112 add the index entries for a new record
3114 int ldb_kv_index_add_new(struct ldb_module *module,
3115 struct ldb_kv_private *ldb_kv,
3116 const struct ldb_message *msg)
3118 int ret;
3120 if (ldb_dn_is_special(msg->dn)) {
3121 return LDB_SUCCESS;
3124 ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3125 if (ret != LDB_SUCCESS) {
3127 * Because we can't trust the caller to be doing
3128 * transactions properly, clean up any index for this
3129 * entry rather than relying on a transaction
3130 * cleanup
3133 ldb_kv_index_delete(module, msg);
3134 return ret;
3137 ret = ldb_kv_index_onelevel(module, msg, 1);
3138 if (ret != LDB_SUCCESS) {
3140 * Because we can't trust the caller to be doing
3141 * transactions properly, clean up any index for this
3142 * entry rather than relying on a transaction
3143 * cleanup
3145 ldb_kv_index_delete(module, msg);
3146 return ret;
3148 return ret;
3153 delete an index entry for one message element
3155 int ldb_kv_index_del_value(struct ldb_module *module,
3156 struct ldb_kv_private *ldb_kv,
3157 const struct ldb_message *msg,
3158 struct ldb_message_element *el,
3159 unsigned int v_idx)
3161 struct ldb_context *ldb;
3162 struct ldb_dn *dn_key;
3163 const char *dn_str;
3164 int ret, i;
3165 unsigned int j;
3166 struct dn_list *list;
3167 struct ldb_dn *dn = msg->dn;
3168 enum key_truncation truncation = KEY_NOT_TRUNCATED;
3170 ldb = ldb_module_get_ctx(module);
3172 dn_str = ldb_dn_get_linearized(dn);
3173 if (dn_str == NULL) {
3174 return LDB_ERR_OPERATIONS_ERROR;
3177 if (dn_str[0] == '@') {
3178 return LDB_SUCCESS;
3181 dn_key = ldb_kv_index_key(
3182 ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
3184 * We ignore key truncation in ltdb_index_add1() so
3185 * match that by ignoring it here as well
3187 * Multiple values are legitimate and accepted
3189 if (!dn_key) {
3190 return LDB_ERR_OPERATIONS_ERROR;
3193 list = talloc_zero(dn_key, struct dn_list);
3194 if (list == NULL) {
3195 talloc_free(dn_key);
3196 return LDB_ERR_OPERATIONS_ERROR;
3199 ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
3200 DN_LIST_MUTABLE);
3201 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
3202 /* it wasn't indexed. Did we have an earlier error? If we did then
3203 its gone now */
3204 talloc_free(dn_key);
3205 return LDB_SUCCESS;
3208 if (ret != LDB_SUCCESS) {
3209 talloc_free(dn_key);
3210 return ret;
3214 * Find one of the values matching this message to remove
3216 i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
3217 if (i == -1) {
3218 /* nothing to delete */
3219 talloc_free(dn_key);
3220 return LDB_SUCCESS;
3223 j = (unsigned int) i;
3224 if (j != list->count - 1) {
3225 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
3227 list->count--;
3228 if (list->count == 0) {
3229 talloc_free(list->dn);
3230 list->dn = NULL;
3231 } else {
3232 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
3235 ret = ldb_kv_dn_list_store(module, dn_key, list);
3237 talloc_free(dn_key);
3239 return ret;
3243 delete the index entries for a element
3244 return -1 on failure
3246 int ldb_kv_index_del_element(struct ldb_module *module,
3247 struct ldb_kv_private *ldb_kv,
3248 const struct ldb_message *msg,
3249 struct ldb_message_element *el)
3251 const char *dn_str;
3252 int ret;
3253 unsigned int i;
3255 if (!ldb_kv->cache->attribute_indexes) {
3256 /* no indexed fields */
3257 return LDB_SUCCESS;
3260 dn_str = ldb_dn_get_linearized(msg->dn);
3261 if (dn_str == NULL) {
3262 return LDB_ERR_OPERATIONS_ERROR;
3265 if (dn_str[0] == '@') {
3266 return LDB_SUCCESS;
3269 if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3270 return LDB_SUCCESS;
3272 for (i = 0; i < el->num_values; i++) {
3273 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
3274 if (ret != LDB_SUCCESS) {
3275 return ret;
3279 return LDB_SUCCESS;
3283 delete the index entries for a record
3284 return -1 on failure
3286 int ldb_kv_index_delete(struct ldb_module *module,
3287 const struct ldb_message *msg)
3289 struct ldb_kv_private *ldb_kv = talloc_get_type(
3290 ldb_module_get_private(module), struct ldb_kv_private);
3291 int ret;
3292 unsigned int i;
3294 if (ldb_dn_is_special(msg->dn)) {
3295 return LDB_SUCCESS;
3298 ret = ldb_kv_index_onelevel(module, msg, 0);
3299 if (ret != LDB_SUCCESS) {
3300 return ret;
3303 ret = ldb_kv_write_index_dn_guid(module, msg, 0);
3304 if (ret != LDB_SUCCESS) {
3305 return ret;
3308 if (!ldb_kv->cache->attribute_indexes) {
3309 /* no indexed fields */
3310 return LDB_SUCCESS;
3313 for (i = 0; i < msg->num_elements; i++) {
3314 ret = ldb_kv_index_del_element(
3315 module, ldb_kv, msg, &msg->elements[i]);
3316 if (ret != LDB_SUCCESS) {
3317 return ret;
3321 return LDB_SUCCESS;
3326 traversal function that deletes all @INDEX records in the in-memory
3327 TDB.
3329 This does not touch the actual DB, that is done at transaction
3330 commit, which in turn greatly reduces DB churn as we will likely
3331 be able to do a direct update into the old record.
3333 static int delete_index(struct ldb_kv_private *ldb_kv,
3334 struct ldb_val key,
3335 _UNUSED_ struct ldb_val data,
3336 void *state)
3338 struct ldb_module *module = state;
3339 const char *dnstr = "DN=" LDB_KV_INDEX ":";
3340 struct dn_list list;
3341 struct ldb_dn *dn;
3342 struct ldb_val v;
3343 int ret;
3345 if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
3346 return 0;
3348 /* we need to put a empty list in the internal tdb for this
3349 * index entry */
3350 list.dn = NULL;
3351 list.count = 0;
3353 /* the offset of 3 is to remove the DN= prefix. */
3354 v.data = key.data + 3;
3355 v.length = strnlen((char *)key.data, key.length) - 3;
3357 dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
3360 * This does not actually touch the DB quite yet, just
3361 * the in-memory index cache
3363 ret = ldb_kv_dn_list_store(module, dn, &list);
3364 if (ret != LDB_SUCCESS) {
3365 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3366 "Unable to store null index for %s\n",
3367 ldb_dn_get_linearized(dn));
3368 talloc_free(dn);
3369 return -1;
3371 talloc_free(dn);
3372 return 0;
3376 traversal function that adds @INDEX records during a re index TODO wrong comment
3378 static int re_key(struct ldb_kv_private *ldb_kv,
3379 struct ldb_val key,
3380 struct ldb_val val,
3381 void *state)
3383 struct ldb_context *ldb;
3384 struct ldb_kv_reindex_context *ctx =
3385 (struct ldb_kv_reindex_context *)state;
3386 struct ldb_module *module = ldb_kv->module;
3387 struct ldb_message *msg;
3388 int ret;
3389 struct ldb_val key2;
3390 bool is_record;
3392 ldb = ldb_module_get_ctx(module);
3394 is_record = ldb_kv_key_is_normal_record(key);
3395 if (is_record == false) {
3396 return 0;
3399 msg = ldb_msg_new(module);
3400 if (msg == NULL) {
3401 return -1;
3404 ret = ldb_unpack_data(ldb, &val, msg);
3405 if (ret != 0) {
3406 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3407 ldb_dn_get_linearized(msg->dn));
3408 ctx->error = ret;
3409 talloc_free(msg);
3410 return -1;
3413 if (msg->dn == NULL) {
3414 ldb_debug(ldb, LDB_DEBUG_ERROR,
3415 "Refusing to re-index as GUID "
3416 "key %*.*s with no DN\n",
3417 (int)key.length, (int)key.length,
3418 (char *)key.data);
3419 talloc_free(msg);
3420 return -1;
3423 /* check if the DN key has changed, perhaps due to the case
3424 insensitivity of an element changing, or a change from DN
3425 to GUID keys */
3426 key2 = ldb_kv_key_msg(module, msg, msg);
3427 if (key2.data == NULL) {
3428 /* probably a corrupt record ... darn */
3429 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
3430 ldb_dn_get_linearized(msg->dn));
3431 talloc_free(msg);
3432 return 0;
3434 if (key.length != key2.length ||
3435 (memcmp(key.data, key2.data, key.length) != 0)) {
3436 ldb_kv->kv_ops->update_in_iterate(
3437 ldb_kv, key, key2, val, ctx);
3439 talloc_free(key2.data);
3441 talloc_free(msg);
3443 ctx->count++;
3444 if (ctx->count % 10000 == 0) {
3445 ldb_debug(ldb, LDB_DEBUG_WARNING,
3446 "Reindexing: re-keyed %u records so far",
3447 ctx->count);
3450 return 0;
3454 traversal function that adds @INDEX records during a re index
3456 static int re_index(struct ldb_kv_private *ldb_kv,
3457 struct ldb_val key,
3458 struct ldb_val val,
3459 void *state)
3461 struct ldb_context *ldb;
3462 struct ldb_kv_reindex_context *ctx =
3463 (struct ldb_kv_reindex_context *)state;
3464 struct ldb_module *module = ldb_kv->module;
3465 struct ldb_message *msg;
3466 int ret;
3467 bool is_record;
3469 ldb = ldb_module_get_ctx(module);
3471 is_record = ldb_kv_key_is_normal_record(key);
3472 if (is_record == false) {
3473 return 0;
3476 msg = ldb_msg_new(module);
3477 if (msg == NULL) {
3478 return -1;
3481 ret = ldb_unpack_data(ldb, &val, msg);
3482 if (ret != 0) {
3483 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3484 ldb_dn_get_linearized(msg->dn));
3485 ctx->error = ret;
3486 talloc_free(msg);
3487 return -1;
3490 if (msg->dn == NULL) {
3491 ldb_debug(ldb, LDB_DEBUG_ERROR,
3492 "Refusing to re-index as GUID "
3493 "key %*.*s with no DN\n",
3494 (int)key.length, (int)key.length,
3495 (char *)key.data);
3496 talloc_free(msg);
3497 return -1;
3500 ret = ldb_kv_index_onelevel(module, msg, 1);
3501 if (ret != LDB_SUCCESS) {
3502 ldb_debug(ldb, LDB_DEBUG_ERROR,
3503 "Adding special ONE LEVEL index failed (%s)!",
3504 ldb_dn_get_linearized(msg->dn));
3505 talloc_free(msg);
3506 return -1;
3509 ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3511 if (ret != LDB_SUCCESS) {
3512 ctx->error = ret;
3513 talloc_free(msg);
3514 return -1;
3517 talloc_free(msg);
3519 ctx->count++;
3520 if (ctx->count % 10000 == 0) {
3521 ldb_debug(ldb, LDB_DEBUG_WARNING,
3522 "Reindexing: re-indexed %u records so far",
3523 ctx->count);
3526 return 0;
3529 static int re_pack(struct ldb_kv_private *ldb_kv,
3530 _UNUSED_ struct ldb_val key,
3531 struct ldb_val val,
3532 void *state)
3534 struct ldb_context *ldb;
3535 struct ldb_message *msg;
3536 struct ldb_module *module = ldb_kv->module;
3537 struct ldb_kv_repack_context *ctx =
3538 (struct ldb_kv_repack_context *)state;
3539 int ret;
3541 ldb = ldb_module_get_ctx(module);
3543 msg = ldb_msg_new(module);
3544 if (msg == NULL) {
3545 return -1;
3548 ret = ldb_unpack_data(ldb, &val, msg);
3549 if (ret != 0) {
3550 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: unpack failed: %s\n",
3551 ldb_dn_get_linearized(msg->dn));
3552 ctx->error = ret;
3553 talloc_free(msg);
3554 return -1;
3557 ret = ldb_kv_store(module, msg, TDB_MODIFY);
3558 if (ret != LDB_SUCCESS) {
3559 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: store failed: %s\n",
3560 ldb_dn_get_linearized(msg->dn));
3561 ctx->error = ret;
3562 talloc_free(msg);
3563 return -1;
3567 * Warn the user that we're repacking the first time we see a normal
3568 * record. This means we never warn if we're repacking a database with
3569 * only @ records. This is because during database initialisation,
3570 * we might repack as initial settings are written out, and we don't
3571 * want to spam the log.
3573 if ((!ctx->normal_record_seen) && (!ldb_dn_is_special(msg->dn))) {
3574 ldb_debug(ldb, LDB_DEBUG_WARNING,
3575 "Repacking database with format %#010x",
3576 ldb_kv->pack_format_version);
3577 ctx->normal_record_seen = true;
3580 ctx->count++;
3581 if (ctx->count % 10000 == 0) {
3582 ldb_debug(ldb, LDB_DEBUG_WARNING,
3583 "Repack: re-packed %u records so far",
3584 ctx->count);
3587 return 0;
3590 int ldb_kv_repack(struct ldb_module *module)
3592 struct ldb_kv_private *ldb_kv = talloc_get_type(
3593 ldb_module_get_private(module), struct ldb_kv_private);
3594 struct ldb_context *ldb = ldb_module_get_ctx(module);
3595 struct ldb_kv_repack_context ctx;
3596 int ret;
3598 ctx.count = 0;
3599 ctx.error = LDB_SUCCESS;
3600 ctx.normal_record_seen = false;
3602 /* Iterate all database records and repack them in the new format */
3603 ret = ldb_kv->kv_ops->iterate(ldb_kv, re_pack, &ctx);
3604 if (ret < 0) {
3605 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack traverse failed: %s",
3606 ldb_errstring(ldb));
3607 return LDB_ERR_OPERATIONS_ERROR;
3610 if (ctx.error != LDB_SUCCESS) {
3611 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack failed: %s",
3612 ldb_errstring(ldb));
3613 return ctx.error;
3616 return LDB_SUCCESS;
3620 force a complete reindex of the database
3622 int ldb_kv_reindex(struct ldb_module *module)
3624 struct ldb_kv_private *ldb_kv = talloc_get_type(
3625 ldb_module_get_private(module), struct ldb_kv_private);
3626 int ret;
3627 struct ldb_kv_reindex_context ctx;
3628 size_t index_cache_size = 0;
3631 * Only triggered after a modification, but make clear we do
3632 * not re-index a read-only DB
3634 if (ldb_kv->read_only) {
3635 return LDB_ERR_UNWILLING_TO_PERFORM;
3638 if (ldb_kv_cache_reload(module) != 0) {
3639 return LDB_ERR_OPERATIONS_ERROR;
3643 * Ensure we read (and so remove) the entries from the real
3644 * DB, no values stored so far are any use as we want to do a
3645 * re-index
3647 ldb_kv_index_transaction_cancel(module);
3648 if (ldb_kv->nested_idx_ptr != NULL) {
3649 ldb_kv_index_sub_transaction_cancel(ldb_kv);
3653 * Calculate the size of the index cache that we'll need for
3654 * the re-index
3656 index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
3657 if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
3658 index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
3662 * Note that we don't start an index sub transaction for re-indexing
3664 ret = ldb_kv_index_transaction_start(module, index_cache_size);
3665 if (ret != LDB_SUCCESS) {
3666 return ret;
3669 /* first traverse the database deleting any @INDEX records by
3670 * putting NULL entries in the in-memory tdb
3672 ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3673 if (ret < 0) {
3674 struct ldb_context *ldb = ldb_module_get_ctx(module);
3675 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3676 ldb_errstring(ldb));
3677 return LDB_ERR_OPERATIONS_ERROR;
3680 ctx.error = 0;
3681 ctx.count = 0;
3683 ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3684 if (ret < 0) {
3685 struct ldb_context *ldb = ldb_module_get_ctx(module);
3686 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3687 ldb_errstring(ldb));
3688 return LDB_ERR_OPERATIONS_ERROR;
3691 if (ctx.error != LDB_SUCCESS) {
3692 struct ldb_context *ldb = ldb_module_get_ctx(module);
3693 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3694 return ctx.error;
3697 ctx.error = 0;
3698 ctx.count = 0;
3700 /* now traverse adding any indexes for normal LDB records */
3701 ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3702 if (ret < 0) {
3703 struct ldb_context *ldb = ldb_module_get_ctx(module);
3704 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3705 ldb_errstring(ldb));
3706 return LDB_ERR_OPERATIONS_ERROR;
3709 if (ctx.error != LDB_SUCCESS) {
3710 struct ldb_context *ldb = ldb_module_get_ctx(module);
3711 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3712 return ctx.error;
3715 if (ctx.count > 10000) {
3716 ldb_debug(ldb_module_get_ctx(module),
3717 LDB_DEBUG_WARNING,
3718 "Reindexing: re_index successful on %s, "
3719 "final index write-out will be in transaction commit",
3720 ldb_kv->kv_ops->name(ldb_kv));
3722 return LDB_SUCCESS;
3726 * Copy the contents of the nested transaction index cache record to the
3727 * transaction index cache.
3729 * During this 'commit' of the subtransaction to the main transaction
3730 * (cache), care must be taken to free any existing index at the top
3731 * level because otherwise we would leak memory.
3733 static int ldb_kv_sub_transaction_traverse(
3734 struct tdb_context *tdb,
3735 TDB_DATA key,
3736 TDB_DATA data,
3737 void *state)
3739 struct ldb_module *module = state;
3740 struct ldb_kv_private *ldb_kv = talloc_get_type(
3741 ldb_module_get_private(module), struct ldb_kv_private);
3742 TDB_DATA rec = {0};
3743 struct dn_list *index_in_subtransaction = NULL;
3744 struct dn_list *index_in_top_level = NULL;
3745 int ret = 0;
3748 * This unwraps the pointer in the DB into a pointer in
3749 * memory, we are abusing TDB as a hash map, not a linearised
3750 * database store
3752 index_in_subtransaction = ldb_kv_index_idxptr(module, data);
3753 if (index_in_subtransaction == NULL) {
3754 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
3755 return -1;
3759 * Do we already have an entry in the primary transaction cache
3760 * If so free it's dn_list and replace it with the dn_list from
3761 * the secondary cache
3763 * The TDB and so the fetched rec contains NO DATA, just a
3764 * pointer to data held in memory.
3766 rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
3767 if (rec.dptr != NULL) {
3768 index_in_top_level = ldb_kv_index_idxptr(module, rec);
3769 free(rec.dptr);
3770 if (index_in_top_level == NULL) {
3771 abort();
3774 * We had this key at the top level. However we made a copy
3775 * at the sub-transaction level so that we could possibly
3776 * roll back. We have to free the top level index memory
3777 * otherwise we would leak
3779 if (index_in_top_level->count > 0) {
3780 TALLOC_FREE(index_in_top_level->dn);
3782 index_in_top_level->dn
3783 = talloc_steal(index_in_top_level,
3784 index_in_subtransaction->dn);
3785 index_in_top_level->count = index_in_subtransaction->count;
3786 return 0;
3789 index_in_top_level = talloc(ldb_kv->idxptr, struct dn_list);
3790 if (index_in_top_level == NULL) {
3791 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
3792 return -1;
3794 index_in_top_level->dn
3795 = talloc_steal(index_in_top_level,
3796 index_in_subtransaction->dn);
3797 index_in_top_level->count = index_in_subtransaction->count;
3799 rec.dptr = (uint8_t *)&index_in_top_level;
3800 rec.dsize = sizeof(void *);
3804 * This is not a store into the main DB, but into an in-memory
3805 * TDB, so we don't need a guard on ltdb->read_only
3807 ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
3808 if (ret != 0) {
3809 ldb_kv->idxptr->error = ltdb_err_map(
3810 tdb_error(ldb_kv->idxptr->itdb));
3811 return -1;
3813 return 0;
3817 * Initialise the index cache for a sub transaction.
3819 int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
3821 ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
3822 if (ldb_kv->nested_idx_ptr == NULL) {
3823 return LDB_ERR_OPERATIONS_ERROR;
3827 * We use a tiny hash size for the sub-database (11).
3829 * The sub-transaction is only for one record at a time, we
3830 * would use a linked list but that would make the code even
3831 * more complex when manipulating the index, as it would have
3832 * to know if we were in a nested transaction (normal
3833 * operations) or the top one (a reindex).
3835 ldb_kv->nested_idx_ptr->itdb =
3836 tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
3837 if (ldb_kv->nested_idx_ptr->itdb == NULL) {
3838 return LDB_ERR_OPERATIONS_ERROR;
3840 return LDB_SUCCESS;
3844 * Clear the contents of the nested transaction index cache when the nested
3845 * transaction is cancelled.
3847 int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
3849 if (ldb_kv->nested_idx_ptr != NULL) {
3850 tdb_close(ldb_kv->nested_idx_ptr->itdb);
3851 TALLOC_FREE(ldb_kv->nested_idx_ptr);
3853 return LDB_SUCCESS;
3857 * Commit a nested transaction,
3858 * Copy the contents of the nested transaction index cache to the
3859 * transaction index cache.
3861 int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
3863 int ret = 0;
3865 if (ldb_kv->nested_idx_ptr == NULL) {
3866 return LDB_SUCCESS;
3868 if (ldb_kv->nested_idx_ptr->itdb == NULL) {
3869 return LDB_SUCCESS;
3871 tdb_traverse(
3872 ldb_kv->nested_idx_ptr->itdb,
3873 ldb_kv_sub_transaction_traverse,
3874 ldb_kv->module);
3875 tdb_close(ldb_kv->nested_idx_ptr->itdb);
3876 ldb_kv->nested_idx_ptr->itdb = NULL;
3878 ret = ldb_kv->nested_idx_ptr->error;
3879 if (ret != LDB_SUCCESS) {
3880 struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
3881 if (!ldb_errstring(ldb)) {
3882 ldb_set_errstring(ldb, ldb_strerror(ret));
3884 ldb_asprintf_errstring(
3885 ldb,
3886 __location__": Failed to update index records in "
3887 "sub transaction commit: %s",
3888 ldb_errstring(ldb));
3890 TALLOC_FREE(ldb_kv->nested_idx_ptr);
3891 return ret;