ldb: Improve coding style in ldb_kv_index_dn_simple()
[Samba.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
blob2935b95c4ddce0e230d4e9a142b614043993e677
1 /*
2 ldb database library
4 Copyright (C) Andrew Tridgell 2004-2009
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 * Name: ldb
27 * Component: ldb key value backend - indexing
29 * Description: indexing routines for ldb key value backend
31 * Author: Andrew Tridgell
36 LDB Index design and choice of key:
37 =======================================
39 LDB has index records held as LDB objects with a special record like:
41 dn: @INDEX:attr:value
43 value may be base64 encoded, if it is deemed not printable:
45 dn: @INDEX:attr::base64-value
47 In each record, there is two possible formats:
49 The original format is:
50 -----------------------
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
56 In this format, @IDX is multi-valued, one entry for each match
58 The corrosponding entry is stored in a TDB record with key:
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
65 The original mixed-case DN is stored in the entry iself.
68 The new 'GUID index' format is:
69 -------------------------------
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed. The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
82 The corrosponding entry is stored in a TDB record with key:
84 GUID=<binary GUID>
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
95 Exception for special @ DNs:
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
102 Control points for choice of index mode
103 ---------------------------------------
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
112 By default, the original DN format is used.
115 Control points for choosing indexed attributes
116 ----------------------------------------------
118 @IDXATTR controls if an attribute is indexed
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
125 C Override functions
126 --------------------
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129 const char *GUID_index_attribute,
130 const char *GUID_index_dn_component)
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136 bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138 ldb_attribute_handler_override_fn_t override,
139 void *private_data);
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151 #include "lib/util/attr.h"
153 struct dn_list {
154 unsigned int count;
155 struct ldb_val *dn;
157 * Do not optimise the intersection of this list,
158 * we must never return an entry not in this
159 * list. This allows the index for
160 * SCOPE_ONELEVEL to be trusted.
162 bool strict;
165 struct ldb_kv_idxptr {
167 * In memory tdb to cache the index updates performed during a
168 * transaction. This improves the performance of operations like
169 * re-index and join
171 struct tdb_context *itdb;
172 int error;
175 enum key_truncation {
176 KEY_NOT_TRUNCATED,
177 KEY_TRUNCATED,
180 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
181 const struct ldb_message *msg,
182 int add);
183 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
184 struct ldb_kv_private *ldb_kv,
185 struct ldb_dn *base_dn,
186 struct dn_list *dn_list,
187 enum key_truncation *truncation);
189 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
190 struct dn_list *list);
192 /* we put a @IDXVERSION attribute on index entries. This
193 allows us to tell if it was written by an older version
195 #define LDB_KV_INDEXING_VERSION 2
197 #define LDB_KV_GUID_INDEXING_VERSION 3
199 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
201 if (ldb_kv->max_key_length == 0) {
202 return UINT_MAX;
204 return ldb_kv->max_key_length;
207 /* enable the idxptr mode when transactions start */
208 int ldb_kv_index_transaction_start(
209 struct ldb_module *module,
210 size_t cache_size)
212 struct ldb_kv_private *ldb_kv = talloc_get_type(
213 ldb_module_get_private(module), struct ldb_kv_private);
214 ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
215 if (ldb_kv->idxptr == NULL) {
216 return ldb_oom(ldb_module_get_ctx(module));
219 ldb_kv->idxptr->itdb = tdb_open(
220 NULL,
221 cache_size,
222 TDB_INTERNAL,
223 O_RDWR,
225 if (ldb_kv->idxptr->itdb == NULL) {
226 return LDB_ERR_OPERATIONS_ERROR;
229 return LDB_SUCCESS;
233 see if two ldb_val structures contain exactly the same data
234 return -1 or 1 for a mismatch, 0 for match
236 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
237 const struct ldb_val *v2)
239 if (v1->length > v2->length) {
240 return -1;
242 if (v1->length < v2->length) {
243 return 1;
245 return memcmp(v1->data, v2->data, v1->length);
249 see if two ldb_val structures contain exactly the same data
250 return -1 or 1 for a mismatch, 0 for match
252 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
253 const struct ldb_val *v2)
255 if (v1.length > v2->length) {
256 return -1;
258 if (v1.length < v2->length) {
259 return 1;
261 return memcmp(v1.data, v2->data, v1.length);
266 find a entry in a dn_list, using a ldb_val. Uses a case sensitive
267 binary-safe comparison for the 'dn' returns -1 if not found
269 This is therefore safe when the value is a GUID in the future
271 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
272 const struct dn_list *list,
273 const struct ldb_val *v)
275 unsigned int i;
276 struct ldb_val *exact = NULL, *next = NULL;
278 if (ldb_kv->cache->GUID_index_attribute == NULL) {
279 for (i=0; i<list->count; i++) {
280 if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
281 return i;
284 return -1;
287 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
288 *v, ldb_val_equal_exact_ordered,
289 exact, next);
290 if (exact == NULL) {
291 return -1;
293 /* Not required, but keeps the compiler quiet */
294 if (next != NULL) {
295 return -1;
298 i = exact - list->dn;
299 return i;
303 find a entry in a dn_list. Uses a case sensitive comparison with the dn
304 returns -1 if not found
306 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
307 struct dn_list *list,
308 const struct ldb_message *msg)
310 struct ldb_val v;
311 const struct ldb_val *key_val;
312 if (ldb_kv->cache->GUID_index_attribute == NULL) {
313 const char *dn_str = ldb_dn_get_linearized(msg->dn);
314 v.data = discard_const_p(unsigned char, dn_str);
315 v.length = strlen(dn_str);
316 } else {
317 key_val = ldb_msg_find_ldb_val(
318 msg, ldb_kv->cache->GUID_index_attribute);
319 if (key_val == NULL) {
320 return -1;
322 v = *key_val;
324 return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
328 this is effectively a cast function, but with lots of paranoia
329 checks and also copes with CPUs that are fussy about pointer
330 alignment
332 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
333 TDB_DATA rec)
335 struct dn_list *list;
336 if (rec.dsize != sizeof(void *)) {
337 ldb_asprintf_errstring(ldb_module_get_ctx(module),
338 "Bad data size for idxptr %u", (unsigned)rec.dsize);
339 return NULL;
341 /* note that we can't just use a cast here, as rec.dptr may
342 not be aligned sufficiently for a pointer. A cast would cause
343 platforms like some ARM CPUs to crash */
344 memcpy(&list, rec.dptr, sizeof(void *));
345 list = talloc_get_type(list, struct dn_list);
346 if (list == NULL) {
347 ldb_asprintf_errstring(ldb_module_get_ctx(module),
348 "Bad type '%s' for idxptr",
349 talloc_get_name(list));
350 return NULL;
352 return list;
355 enum dn_list_will_be_read_only {
356 DN_LIST_MUTABLE = 0,
357 DN_LIST_WILL_BE_READ_ONLY = 1,
361 return the @IDX list in an index entry for a dn as a
362 struct dn_list
364 static int ldb_kv_dn_list_load(struct ldb_module *module,
365 struct ldb_kv_private *ldb_kv,
366 struct ldb_dn *dn,
367 struct dn_list *list,
368 enum dn_list_will_be_read_only read_only)
370 struct ldb_message *msg;
371 int ret, version;
372 struct ldb_message_element *el;
373 TDB_DATA rec = {0};
374 struct dn_list *list2;
375 bool from_primary_cache = false;
376 TDB_DATA key = {0};
378 list->dn = NULL;
379 list->count = 0;
380 list->strict = false;
383 * See if we have an in memory index cache
385 if (ldb_kv->idxptr == NULL) {
386 goto normal_index;
389 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
390 key.dsize = strlen((char *)key.dptr);
393 * Have we cached this index record?
394 * If we have a nested transaction cache try that first.
395 * then try the transaction cache.
396 * if the record is not cached it will need to be read from disk.
398 if (ldb_kv->nested_idx_ptr != NULL) {
399 rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
401 if (rec.dptr == NULL) {
402 from_primary_cache = true;
403 rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
405 if (rec.dptr == NULL) {
406 goto normal_index;
409 /* we've found an in-memory index entry */
410 list2 = ldb_kv_index_idxptr(module, rec);
411 if (list2 == NULL) {
412 free(rec.dptr);
413 return LDB_ERR_OPERATIONS_ERROR;
415 free(rec.dptr);
418 * If this is a read only transaction the indexes will not be
419 * changed so we don't need a copy in the event of a rollback
421 * In this case make an early return
423 if (read_only == DN_LIST_WILL_BE_READ_ONLY) {
424 *list = *list2;
425 return LDB_SUCCESS;
429 * record was read from the sub transaction cache, so we have
430 * already copied the primary cache record
432 if (!from_primary_cache) {
433 *list = *list2;
434 return LDB_SUCCESS;
438 * No index sub transaction active, so no need to cache a copy
440 if (ldb_kv->nested_idx_ptr == NULL) {
441 *list = *list2;
442 return LDB_SUCCESS;
446 * There is an active index sub transaction, and the record was
447 * found in the primary index transaction cache. A copy of the
448 * record needs be taken to prevent the original entry being
449 * altered, until the index sub transaction is committed.
453 struct ldb_val *dns = NULL;
454 size_t x = 0;
456 dns = talloc_array(
457 list,
458 struct ldb_val,
459 list2->count);
460 if (dns == NULL) {
461 return LDB_ERR_OPERATIONS_ERROR;
463 for (x = 0; x < list2->count; x++) {
464 dns[x].length = list2->dn[x].length;
465 dns[x].data = talloc_memdup(
466 dns,
467 list2->dn[x].data,
468 list2->dn[x].length);
469 if (dns[x].data == NULL) {
470 TALLOC_FREE(dns);
471 return LDB_ERR_OPERATIONS_ERROR;
474 list->dn = dns;
475 list->count = list2->count;
477 return LDB_SUCCESS;
480 * Index record not found in the caches, read it from the
481 * database.
483 normal_index:
484 msg = ldb_msg_new(list);
485 if (msg == NULL) {
486 return LDB_ERR_OPERATIONS_ERROR;
489 ret = ldb_kv_search_dn1(module,
491 msg,
492 LDB_UNPACK_DATA_FLAG_NO_DN |
494 * The entry point ldb_kv_search_indexed is
495 * only called from the read-locked
496 * ldb_kv_search.
498 LDB_UNPACK_DATA_FLAG_READ_LOCKED);
499 if (ret != LDB_SUCCESS) {
500 talloc_free(msg);
501 return ret;
504 el = ldb_msg_find_element(msg, LDB_KV_IDX);
505 if (!el) {
506 talloc_free(msg);
507 return LDB_SUCCESS;
510 version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
513 * we avoid copying the strings by stealing the list. We have
514 * to steal msg onto el->values (which looks odd) because
515 * the memory is allocated on msg, not on each value.
517 if (ldb_kv->cache->GUID_index_attribute == NULL) {
518 /* check indexing version number */
519 if (version != LDB_KV_INDEXING_VERSION) {
520 ldb_debug_set(ldb_module_get_ctx(module),
521 LDB_DEBUG_ERROR,
522 "Wrong DN index version %d "
523 "expected %d for %s",
524 version, LDB_KV_INDEXING_VERSION,
525 ldb_dn_get_linearized(dn));
526 talloc_free(msg);
527 return LDB_ERR_OPERATIONS_ERROR;
530 talloc_steal(el->values, msg);
531 list->dn = talloc_steal(list, el->values);
532 list->count = el->num_values;
533 } else {
534 unsigned int i;
535 if (version != LDB_KV_GUID_INDEXING_VERSION) {
536 /* This is quite likely during the DB startup
537 on first upgrade to using a GUID index */
538 ldb_debug_set(ldb_module_get_ctx(module),
539 LDB_DEBUG_ERROR,
540 "Wrong GUID index version %d "
541 "expected %d for %s",
542 version, LDB_KV_GUID_INDEXING_VERSION,
543 ldb_dn_get_linearized(dn));
544 talloc_free(msg);
545 return LDB_ERR_OPERATIONS_ERROR;
548 if (el->num_values == 0) {
549 talloc_free(msg);
550 return LDB_ERR_OPERATIONS_ERROR;
553 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
554 talloc_free(msg);
555 return LDB_ERR_OPERATIONS_ERROR;
558 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
559 list->dn = talloc_array(list, struct ldb_val, list->count);
560 if (list->dn == NULL) {
561 talloc_free(msg);
562 return LDB_ERR_OPERATIONS_ERROR;
566 * The actual data is on msg.
568 talloc_steal(list->dn, msg);
569 for (i = 0; i < list->count; i++) {
570 list->dn[i].data
571 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
572 list->dn[i].length = LDB_KV_GUID_SIZE;
576 /* We don't need msg->elements any more */
577 talloc_free(msg->elements);
578 return LDB_SUCCESS;
581 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
582 struct ldb_kv_private *ldb_kv,
583 TALLOC_CTX *mem_ctx,
584 struct ldb_dn *dn,
585 struct ldb_val *ldb_key)
587 struct ldb_context *ldb = ldb_module_get_ctx(module);
588 int ret;
589 int index = 0;
590 enum key_truncation truncation = KEY_NOT_TRUNCATED;
591 struct dn_list *list = talloc(mem_ctx, struct dn_list);
592 if (list == NULL) {
593 ldb_oom(ldb);
594 return LDB_ERR_OPERATIONS_ERROR;
597 ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
598 if (ret != LDB_SUCCESS) {
599 TALLOC_FREE(list);
600 return ret;
603 if (list->count == 0) {
604 TALLOC_FREE(list);
605 return LDB_ERR_NO_SUCH_OBJECT;
608 if (list->count > 1 && truncation == KEY_NOT_TRUNCATED) {
609 const char *dn_str = ldb_dn_get_linearized(dn);
610 ldb_asprintf_errstring(ldb_module_get_ctx(module),
611 __location__
612 ": Failed to read DN index "
613 "against %s for %s: too many "
614 "values (%u > 1)",
615 ldb_kv->cache->GUID_index_attribute,
616 dn_str,
617 list->count);
618 TALLOC_FREE(list);
619 return LDB_ERR_CONSTRAINT_VIOLATION;
622 if (list->count > 0 && truncation == KEY_TRUNCATED) {
624 * DN key has been truncated, need to inspect the actual
625 * records to locate the actual DN
627 unsigned int i;
628 index = -1;
629 for (i=0; i < list->count; i++) {
630 uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
631 struct ldb_val key = {
632 .data = guid_key,
633 .length = sizeof(guid_key)
635 const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
636 struct ldb_message *rec = ldb_msg_new(ldb);
637 if (rec == NULL) {
638 TALLOC_FREE(list);
639 return LDB_ERR_OPERATIONS_ERROR;
642 ret = ldb_kv_idx_to_key(
643 module, ldb_kv, ldb, &list->dn[i], &key);
644 if (ret != LDB_SUCCESS) {
645 TALLOC_FREE(list);
646 TALLOC_FREE(rec);
647 return ret;
650 ret =
651 ldb_kv_search_key(module, ldb_kv, key, rec, flags);
652 if (key.data != guid_key) {
653 TALLOC_FREE(key.data);
655 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
657 * the record has disappeared?
658 * yes, this can happen
660 TALLOC_FREE(rec);
661 continue;
664 if (ret != LDB_SUCCESS) {
665 /* an internal error */
666 TALLOC_FREE(rec);
667 TALLOC_FREE(list);
668 return LDB_ERR_OPERATIONS_ERROR;
672 * We found the actual DN that we wanted from in the
673 * multiple values that matched the index
674 * (due to truncation), so return that.
677 if (ldb_dn_compare(dn, rec->dn) == 0) {
678 index = i;
679 TALLOC_FREE(rec);
680 break;
685 * We matched the index but the actual DN we wanted
686 * was not here.
688 if (index == -1) {
689 TALLOC_FREE(list);
690 return LDB_ERR_NO_SUCH_OBJECT;
694 /* The ldb_key memory is allocated by the caller */
695 ret = ldb_kv_guid_to_key(&list->dn[index], ldb_key);
696 TALLOC_FREE(list);
698 if (ret != LDB_SUCCESS) {
699 return LDB_ERR_OPERATIONS_ERROR;
702 return LDB_SUCCESS;
708 save a dn_list into a full @IDX style record
710 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
711 struct ldb_kv_private *ldb_kv,
712 struct ldb_dn *dn,
713 struct dn_list *list)
715 struct ldb_message *msg;
716 int ret;
718 msg = ldb_msg_new(module);
719 if (!msg) {
720 return ldb_module_oom(module);
723 msg->dn = dn;
725 if (list->count == 0) {
726 ret = ldb_kv_delete_noindex(module, msg);
727 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
728 ret = LDB_SUCCESS;
730 TALLOC_FREE(msg);
731 return ret;
734 if (ldb_kv->cache->GUID_index_attribute == NULL) {
735 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
736 LDB_KV_INDEXING_VERSION);
737 if (ret != LDB_SUCCESS) {
738 TALLOC_FREE(msg);
739 return ldb_module_oom(module);
741 } else {
742 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
743 LDB_KV_GUID_INDEXING_VERSION);
744 if (ret != LDB_SUCCESS) {
745 TALLOC_FREE(msg);
746 return ldb_module_oom(module);
750 if (list->count > 0) {
751 struct ldb_message_element *el;
753 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
754 if (ret != LDB_SUCCESS) {
755 TALLOC_FREE(msg);
756 return ldb_module_oom(module);
759 if (ldb_kv->cache->GUID_index_attribute == NULL) {
760 el->values = list->dn;
761 el->num_values = list->count;
762 } else {
763 struct ldb_val v;
764 unsigned int i;
765 el->values = talloc_array(msg,
766 struct ldb_val, 1);
767 if (el->values == NULL) {
768 TALLOC_FREE(msg);
769 return ldb_module_oom(module);
772 v.data = talloc_array_size(el->values,
773 list->count,
774 LDB_KV_GUID_SIZE);
775 if (v.data == NULL) {
776 TALLOC_FREE(msg);
777 return ldb_module_oom(module);
780 v.length = talloc_get_size(v.data);
782 for (i = 0; i < list->count; i++) {
783 if (list->dn[i].length !=
784 LDB_KV_GUID_SIZE) {
785 TALLOC_FREE(msg);
786 return ldb_module_operr(module);
788 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
789 list->dn[i].data,
790 LDB_KV_GUID_SIZE);
792 el->values[0] = v;
793 el->num_values = 1;
797 ret = ldb_kv_store(module, msg, TDB_REPLACE);
798 TALLOC_FREE(msg);
799 return ret;
803 save a dn_list into the database, in either @IDX or internal format
805 static int ldb_kv_dn_list_store(struct ldb_module *module,
806 struct ldb_dn *dn,
807 struct dn_list *list)
809 struct ldb_kv_private *ldb_kv = talloc_get_type(
810 ldb_module_get_private(module), struct ldb_kv_private);
811 TDB_DATA rec = {0};
812 TDB_DATA key = {0};
814 int ret = LDB_SUCCESS;
815 struct dn_list *list2 = NULL;
816 struct ldb_kv_idxptr *idxptr = NULL;
818 key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
819 if (key.dptr == NULL) {
820 return LDB_ERR_OPERATIONS_ERROR;
822 key.dsize = strlen((char *)key.dptr);
825 * If there is an index sub transaction active, update the
826 * sub transaction index cache. Otherwise update the
827 * primary index cache
829 if (ldb_kv->nested_idx_ptr != NULL) {
830 idxptr = ldb_kv->nested_idx_ptr;
831 } else {
832 idxptr = ldb_kv->idxptr;
835 * Get the cache entry for the index
837 * As the value in the cache is a pointer to a dn_list we update
838 * the dn_list directly.
841 rec = tdb_fetch(idxptr->itdb, key);
842 if (rec.dptr != NULL) {
843 list2 = ldb_kv_index_idxptr(module, rec);
844 if (list2 == NULL) {
845 free(rec.dptr);
846 return LDB_ERR_OPERATIONS_ERROR;
848 free(rec.dptr);
849 /* Now put the updated pointer back in the cache */
850 if (list->dn == NULL) {
851 list2->dn = NULL;
852 list2->count = 0;
853 } else {
854 list2->dn = talloc_steal(list2, list->dn);
855 list2->count = list->count;
857 return LDB_SUCCESS;
860 list2 = talloc(idxptr, struct dn_list);
861 if (list2 == NULL) {
862 return LDB_ERR_OPERATIONS_ERROR;
864 list2->dn = talloc_steal(list2, list->dn);
865 list2->count = list->count;
867 rec.dptr = (uint8_t *)&list2;
868 rec.dsize = sizeof(void *);
872 * This is not a store into the main DB, but into an in-memory
873 * TDB, so we don't need a guard on ltdb->read_only
875 * Also as we directly update the in memory dn_list for existing
876 * cache entries we must be adding a new entry to the cache.
878 ret = tdb_store(idxptr->itdb, key, rec, TDB_INSERT);
879 if (ret != 0) {
880 return ltdb_err_map( tdb_error(idxptr->itdb));
882 return LDB_SUCCESS;
886 traverse function for storing the in-memory index entries on disk
888 static int ldb_kv_index_traverse_store(_UNUSED_ struct tdb_context *tdb,
889 TDB_DATA key,
890 TDB_DATA data,
891 void *state)
893 struct ldb_module *module = state;
894 struct ldb_kv_private *ldb_kv = talloc_get_type(
895 ldb_module_get_private(module), struct ldb_kv_private);
896 struct ldb_dn *dn;
897 struct ldb_context *ldb = ldb_module_get_ctx(module);
898 struct ldb_val v;
899 struct dn_list *list;
901 list = ldb_kv_index_idxptr(module, data);
902 if (list == NULL) {
903 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
904 return -1;
907 v.data = key.dptr;
908 v.length = strnlen((char *)key.dptr, key.dsize);
910 dn = ldb_dn_from_ldb_val(module, ldb, &v);
911 if (dn == NULL) {
912 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
913 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
914 return -1;
917 ldb_kv->idxptr->error =
918 ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
919 talloc_free(dn);
920 if (ldb_kv->idxptr->error != 0) {
921 return -1;
923 return 0;
926 /* cleanup the idxptr mode when transaction commits */
927 int ldb_kv_index_transaction_commit(struct ldb_module *module)
929 struct ldb_kv_private *ldb_kv = talloc_get_type(
930 ldb_module_get_private(module), struct ldb_kv_private);
931 int ret;
933 struct ldb_context *ldb = ldb_module_get_ctx(module);
935 ldb_reset_err_string(ldb);
937 if (ldb_kv->idxptr->itdb) {
938 tdb_traverse(
939 ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
940 tdb_close(ldb_kv->idxptr->itdb);
943 ret = ldb_kv->idxptr->error;
944 if (ret != LDB_SUCCESS) {
945 if (!ldb_errstring(ldb)) {
946 ldb_set_errstring(ldb, ldb_strerror(ret));
948 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
951 talloc_free(ldb_kv->idxptr);
952 ldb_kv->idxptr = NULL;
953 return ret;
956 /* cleanup the idxptr mode when transaction cancels */
957 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
959 struct ldb_kv_private *ldb_kv = talloc_get_type(
960 ldb_module_get_private(module), struct ldb_kv_private);
961 if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
962 tdb_close(ldb_kv->idxptr->itdb);
964 TALLOC_FREE(ldb_kv->idxptr);
965 if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
966 tdb_close(ldb_kv->nested_idx_ptr->itdb);
968 TALLOC_FREE(ldb_kv->nested_idx_ptr);
969 return LDB_SUCCESS;
974 return the dn key to be used for an index
975 the caller is responsible for freeing
977 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
978 struct ldb_kv_private *ldb_kv,
979 const char *attr,
980 const struct ldb_val *value,
981 const struct ldb_schema_attribute **ap,
982 enum key_truncation *truncation)
984 struct ldb_dn *ret;
985 struct ldb_val v;
986 const struct ldb_schema_attribute *a = NULL;
987 char *attr_folded = NULL;
988 const char *attr_for_dn = NULL;
989 int r;
990 bool should_b64_encode;
992 unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
993 size_t key_len = 0;
994 size_t attr_len = 0;
995 const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
996 unsigned frmt_len = 0;
997 const size_t additional_key_length = 4;
998 unsigned int num_separators = 3; /* Estimate for overflow check */
999 const size_t min_data = 1;
1000 const size_t min_key_length = additional_key_length
1001 + indx_len + num_separators + min_data;
1002 struct ldb_val empty;
1005 * Accept a NULL value as a request for a key with no value. This is
1006 * different from passing an empty value, which might be given
1007 * significance by some canonicalise functions.
1009 bool empty_val = value == NULL;
1010 if (empty_val) {
1011 empty.length = 0;
1012 empty.data = discard_const_p(unsigned char, "");
1013 value = &empty;
1016 if (attr[0] == '@') {
1017 attr_for_dn = attr;
1018 v = *value;
1019 if (ap != NULL) {
1020 *ap = NULL;
1022 } else {
1023 attr_folded = ldb_attr_casefold(ldb, attr);
1024 if (!attr_folded) {
1025 return NULL;
1028 attr_for_dn = attr_folded;
1030 a = ldb_schema_attribute_by_name(ldb, attr);
1031 if (ap) {
1032 *ap = a;
1035 if (empty_val) {
1036 v = *value;
1037 } else {
1038 ldb_attr_handler_t fn;
1039 if (a->syntax->index_format_fn &&
1040 ldb_kv->cache->GUID_index_attribute != NULL) {
1041 fn = a->syntax->index_format_fn;
1042 } else {
1043 fn = a->syntax->canonicalise_fn;
1045 r = fn(ldb, ldb, value, &v);
1046 if (r != LDB_SUCCESS) {
1047 const char *errstr = ldb_errstring(ldb);
1048 /* canonicalisation can be refused. For
1049 example, a attribute that takes wildcards
1050 will refuse to canonicalise if the value
1051 contains a wildcard */
1052 ldb_asprintf_errstring(ldb,
1053 "Failed to create "
1054 "index key for "
1055 "attribute '%s':%s%s%s",
1056 attr, ldb_strerror(r),
1057 (errstr?":":""),
1058 (errstr?errstr:""));
1059 talloc_free(attr_folded);
1060 return NULL;
1064 attr_len = strlen(attr_for_dn);
1067 * Check if there is any hope this will fit into the DB.
1068 * Overflow here is not actually critical the code below
1069 * checks again to make the printf and the DB does another
1070 * check for too long keys
1072 if (max_key_length - attr_len < min_key_length) {
1073 ldb_asprintf_errstring(
1074 ldb,
1075 __location__ ": max_key_length "
1076 "is too small (%u) < (%u)",
1077 max_key_length,
1078 (unsigned)(min_key_length + attr_len));
1079 talloc_free(attr_folded);
1080 return NULL;
1084 * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
1085 * "DN=" and a trailing string terminator
1087 max_key_length -= additional_key_length;
1090 * We do not base 64 encode a DN in a key, it has already been
1091 * casefold and lineraized, that is good enough. That already
1092 * avoids embedded NUL etc.
1094 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1095 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
1096 should_b64_encode = false;
1097 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
1099 * We can only change the behaviour for IDXONE
1100 * when the GUID index is enabled
1102 should_b64_encode = false;
1103 } else {
1104 should_b64_encode
1105 = ldb_should_b64_encode(ldb, &v);
1107 } else {
1108 should_b64_encode = ldb_should_b64_encode(ldb, &v);
1111 if (should_b64_encode) {
1112 size_t vstr_len = 0;
1113 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
1114 if (!vstr) {
1115 talloc_free(attr_folded);
1116 return NULL;
1118 vstr_len = strlen(vstr);
1120 * Overflow here is not critical as we only use this
1121 * to choose the printf truncation
1123 key_len = num_separators + indx_len + attr_len + vstr_len;
1124 if (key_len > max_key_length) {
1125 size_t excess = key_len - max_key_length;
1126 frmt_len = vstr_len - excess;
1127 *truncation = KEY_TRUNCATED;
1129 * Truncated keys are placed in a separate key space
1130 * from the non truncated keys
1131 * Note: the double hash "##" is not a typo and
1132 * indicates that the following value is base64 encoded
1134 ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
1135 LDB_KV_INDEX, attr_for_dn,
1136 frmt_len, vstr);
1137 } else {
1138 frmt_len = vstr_len;
1139 *truncation = KEY_NOT_TRUNCATED;
1141 * Note: the double colon "::" is not a typo and
1142 * indicates that the following value is base64 encoded
1144 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1145 LDB_KV_INDEX, attr_for_dn,
1146 frmt_len, vstr);
1148 talloc_free(vstr);
1149 } else {
1150 /* Only need two seperators */
1151 num_separators = 2;
1154 * Overflow here is not critical as we only use this
1155 * to choose the printf truncation
1157 key_len = num_separators + indx_len + attr_len + (int)v.length;
1158 if (key_len > max_key_length) {
1159 size_t excess = key_len - max_key_length;
1160 frmt_len = v.length - excess;
1161 *truncation = KEY_TRUNCATED;
1163 * Truncated keys are placed in a separate key space
1164 * from the non truncated keys
1166 ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1167 LDB_KV_INDEX, attr_for_dn,
1168 frmt_len, (char *)v.data);
1169 } else {
1170 frmt_len = v.length;
1171 *truncation = KEY_NOT_TRUNCATED;
1172 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1173 LDB_KV_INDEX, attr_for_dn,
1174 frmt_len, (char *)v.data);
1178 if (value != NULL && v.data != value->data && !empty_val) {
1179 talloc_free(v.data);
1181 talloc_free(attr_folded);
1183 return ret;
1187 see if a attribute value is in the list of indexed attributes
1189 static bool ldb_kv_is_indexed(struct ldb_module *module,
1190 struct ldb_kv_private *ldb_kv,
1191 const char *attr)
1193 struct ldb_context *ldb = ldb_module_get_ctx(module);
1194 unsigned int i;
1195 struct ldb_message_element *el;
1197 if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1198 (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1199 /* Implicity covered, this is the index key */
1200 return false;
1202 if (ldb->schema.index_handler_override) {
1203 const struct ldb_schema_attribute *a
1204 = ldb_schema_attribute_by_name(ldb, attr);
1206 if (a == NULL) {
1207 return false;
1210 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1211 return true;
1212 } else {
1213 return false;
1217 if (!ldb_kv->cache->attribute_indexes) {
1218 return false;
1221 el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1222 if (el == NULL) {
1223 return false;
1226 /* TODO: this is too expensive! At least use a binary search */
1227 for (i=0; i<el->num_values; i++) {
1228 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1229 return true;
1232 return false;
1236 in the following logic functions, the return value is treated as
1237 follows:
1239 LDB_SUCCESS: we found some matching index values
1241 LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1243 LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1244 we'll need a full search
1248 return a list of dn's that might match a simple indexed search (an
1249 equality search only)
1251 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1252 struct ldb_kv_private *ldb_kv,
1253 const struct ldb_parse_tree *tree,
1254 struct dn_list *list)
1256 struct ldb_context *ldb;
1257 struct ldb_dn *dn;
1258 int ret;
1259 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1261 ldb = ldb_module_get_ctx(module);
1263 list->count = 0;
1264 list->dn = NULL;
1266 /* if the attribute isn't in the list of indexed attributes then
1267 this node needs a full search */
1268 if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1269 return LDB_ERR_OPERATIONS_ERROR;
1272 /* the attribute is indexed. Pull the list of DNs that match the
1273 search criterion */
1274 dn = ldb_kv_index_key(ldb,
1275 ldb_kv,
1276 tree->u.equality.attr,
1277 &tree->u.equality.value,
1278 NULL,
1279 &truncation);
1281 * We ignore truncation here and allow multi-valued matches
1282 * as ltdb_search_indexed will filter out the wrong one in
1283 * ltdb_index_filter() which calls ldb_match_message().
1285 if (!dn) {
1286 return LDB_ERR_OPERATIONS_ERROR;
1289 ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list,
1290 DN_LIST_WILL_BE_READ_ONLY);
1291 talloc_free(dn);
1292 return ret;
1295 static bool list_union(struct ldb_context *ldb,
1296 struct ldb_kv_private *ldb_kv,
1297 struct dn_list *list,
1298 struct dn_list *list2);
1301 return a list of dn's that might match a leaf indexed search
1303 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1304 struct ldb_kv_private *ldb_kv,
1305 const struct ldb_parse_tree *tree,
1306 struct dn_list *list)
1308 if (ldb_kv->disallow_dn_filter &&
1309 (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1310 /* in AD mode we do not support "(dn=...)" search filters */
1311 list->dn = NULL;
1312 list->count = 0;
1313 return LDB_SUCCESS;
1315 if (tree->u.equality.attr[0] == '@') {
1316 /* Do not allow a indexed search against an @ */
1317 list->dn = NULL;
1318 list->count = 0;
1319 return LDB_SUCCESS;
1321 if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1322 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1323 bool valid_dn = false;
1324 struct ldb_dn *dn
1325 = ldb_dn_from_ldb_val(list,
1326 ldb_module_get_ctx(module),
1327 &tree->u.equality.value);
1328 if (dn == NULL) {
1329 /* If we can't parse it, no match */
1330 list->dn = NULL;
1331 list->count = 0;
1332 return LDB_SUCCESS;
1335 valid_dn = ldb_dn_validate(dn);
1336 if (valid_dn == false) {
1337 /* If we can't parse it, no match */
1338 list->dn = NULL;
1339 list->count = 0;
1340 return LDB_SUCCESS;
1344 * Re-use the same code we use for a SCOPE_BASE
1345 * search
1347 * We can't call TALLOC_FREE(dn) as this must belong
1348 * to list for the memory to remain valid.
1350 return ldb_kv_index_dn_base_dn(
1351 module, ldb_kv, dn, list, &truncation);
1353 * We ignore truncation here and allow multi-valued matches
1354 * as ltdb_search_indexed will filter out the wrong one in
1355 * ltdb_index_filter() which calls ldb_match_message().
1358 } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1359 (ldb_attr_cmp(tree->u.equality.attr,
1360 ldb_kv->cache->GUID_index_attribute) == 0)) {
1361 int ret;
1362 struct ldb_context *ldb = ldb_module_get_ctx(module);
1363 list->dn = talloc_array(list, struct ldb_val, 1);
1364 if (list->dn == NULL) {
1365 ldb_module_oom(module);
1366 return LDB_ERR_OPERATIONS_ERROR;
1369 * We need to go via the canonicalise_fn() to
1370 * ensure we get the index in binary, rather
1371 * than a string
1373 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1374 ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1375 if (ret != LDB_SUCCESS) {
1376 return LDB_ERR_OPERATIONS_ERROR;
1378 list->count = 1;
1379 return LDB_SUCCESS;
1382 return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1387 list intersection
1388 list = list & list2
1390 static bool list_intersect(struct ldb_kv_private *ldb_kv,
1391 struct dn_list *list,
1392 const struct dn_list *list2)
1394 const struct dn_list *short_list, *long_list;
1395 struct dn_list *list3;
1396 unsigned int i;
1398 if (list->count == 0) {
1399 /* 0 & X == 0 */
1400 return true;
1402 if (list2->count == 0) {
1403 /* X & 0 == 0 */
1404 list->count = 0;
1405 list->dn = NULL;
1406 return true;
1410 * In both of the below we check for strict and in that
1411 * case do not optimise the intersection of this list,
1412 * we must never return an entry not in this
1413 * list. This allows the index for
1414 * SCOPE_ONELEVEL to be trusted.
1417 /* the indexing code is allowed to return a longer list than
1418 what really matches, as all results are filtered by the
1419 full expression at the end - this shortcut avoids a lot of
1420 work in some cases */
1421 if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1422 return true;
1424 if (list2->count < 2 && list->count > 10 && list->strict == false) {
1425 list->count = list2->count;
1426 list->dn = list2->dn;
1427 /* note that list2 may not be the parent of list2->dn,
1428 as list2->dn may be owned by ltdb->idxptr. In that
1429 case we expect this reparent call to fail, which is
1430 OK */
1431 talloc_reparent(list2, list, list2->dn);
1432 return true;
1435 if (list->count > list2->count) {
1436 short_list = list2;
1437 long_list = list;
1438 } else {
1439 short_list = list;
1440 long_list = list2;
1443 list3 = talloc_zero(list, struct dn_list);
1444 if (list3 == NULL) {
1445 return false;
1448 list3->dn = talloc_array(list3, struct ldb_val,
1449 MIN(list->count, list2->count));
1450 if (!list3->dn) {
1451 talloc_free(list3);
1452 return false;
1454 list3->count = 0;
1456 for (i=0;i<short_list->count;i++) {
1457 /* For the GUID index case, this is a binary search */
1458 if (ldb_kv_dn_list_find_val(
1459 ldb_kv, long_list, &short_list->dn[i]) != -1) {
1460 list3->dn[list3->count] = short_list->dn[i];
1461 list3->count++;
1465 list->strict |= list2->strict;
1466 list->dn = talloc_steal(list, list3->dn);
1467 list->count = list3->count;
1468 talloc_free(list3);
1470 return true;
1475 list union
1476 list = list | list2
1478 static bool list_union(struct ldb_context *ldb,
1479 struct ldb_kv_private *ldb_kv,
1480 struct dn_list *list,
1481 struct dn_list *list2)
1483 struct ldb_val *dn3;
1484 unsigned int i = 0, j = 0, k = 0;
1486 if (list2->count == 0) {
1487 /* X | 0 == X */
1488 return true;
1491 if (list->count == 0) {
1492 /* 0 | X == X */
1493 list->count = list2->count;
1494 list->dn = list2->dn;
1495 /* note that list2 may not be the parent of list2->dn,
1496 as list2->dn may be owned by ltdb->idxptr. In that
1497 case we expect this reparent call to fail, which is
1498 OK */
1499 talloc_reparent(list2, list, list2->dn);
1500 return true;
1504 * Sort the lists (if not in GUID DN mode) so we can do
1505 * the de-duplication during the merge
1507 * NOTE: This can sort the in-memory index values, as list or
1508 * list2 might not be a copy!
1510 ldb_kv_dn_list_sort(ldb_kv, list);
1511 ldb_kv_dn_list_sort(ldb_kv, list2);
1513 dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1514 if (!dn3) {
1515 ldb_oom(ldb);
1516 return false;
1519 while (i < list->count || j < list2->count) {
1520 int cmp;
1521 if (i >= list->count) {
1522 cmp = 1;
1523 } else if (j >= list2->count) {
1524 cmp = -1;
1525 } else {
1526 cmp = ldb_val_equal_exact_ordered(list->dn[i],
1527 &list2->dn[j]);
1530 if (cmp < 0) {
1531 /* Take list */
1532 dn3[k] = list->dn[i];
1533 i++;
1534 k++;
1535 } else if (cmp > 0) {
1536 /* Take list2 */
1537 dn3[k] = list2->dn[j];
1538 j++;
1539 k++;
1540 } else {
1541 /* Equal, take list */
1542 dn3[k] = list->dn[i];
1543 i++;
1544 j++;
1545 k++;
1549 list->dn = dn3;
1550 list->count = k;
1552 return true;
1555 static int ldb_kv_index_dn(struct ldb_module *module,
1556 struct ldb_kv_private *ldb_kv,
1557 const struct ldb_parse_tree *tree,
1558 struct dn_list *list);
1561 process an OR list (a union)
1563 static int ldb_kv_index_dn_or(struct ldb_module *module,
1564 struct ldb_kv_private *ldb_kv,
1565 const struct ldb_parse_tree *tree,
1566 struct dn_list *list)
1568 struct ldb_context *ldb;
1569 unsigned int i;
1571 ldb = ldb_module_get_ctx(module);
1573 list->dn = NULL;
1574 list->count = 0;
1576 for (i=0; i<tree->u.list.num_elements; i++) {
1577 struct dn_list *list2;
1578 int ret;
1580 list2 = talloc_zero(list, struct dn_list);
1581 if (list2 == NULL) {
1582 return LDB_ERR_OPERATIONS_ERROR;
1585 ret = ldb_kv_index_dn(
1586 module, ldb_kv, tree->u.list.elements[i], list2);
1588 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1589 /* X || 0 == X */
1590 talloc_free(list2);
1591 continue;
1594 if (ret != LDB_SUCCESS) {
1595 /* X || * == * */
1596 talloc_free(list2);
1597 return ret;
1600 if (!list_union(ldb, ldb_kv, list, list2)) {
1601 talloc_free(list2);
1602 return LDB_ERR_OPERATIONS_ERROR;
1606 if (list->count == 0) {
1607 return LDB_ERR_NO_SUCH_OBJECT;
1610 return LDB_SUCCESS;
1615 NOT an index results
1617 static int ldb_kv_index_dn_not(_UNUSED_ struct ldb_module *module,
1618 _UNUSED_ struct ldb_kv_private *ldb_kv,
1619 _UNUSED_ const struct ldb_parse_tree *tree,
1620 _UNUSED_ struct dn_list *list)
1622 /* the only way to do an indexed not would be if we could
1623 negate the not via another not or if we knew the total
1624 number of database elements so we could know that the
1625 existing expression covered the whole database.
1627 instead, we just give up, and rely on a full index scan
1628 (unless an outer & manages to reduce the list)
1630 return LDB_ERR_OPERATIONS_ERROR;
1634 * These things are unique, so avoid a full scan if this is a search
1635 * by GUID, DN or a unique attribute
1637 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1638 struct ldb_kv_private *ldb_kv,
1639 const char *attr)
1641 const struct ldb_schema_attribute *a;
1642 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1643 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1644 0) {
1645 return true;
1648 if (ldb_attr_dn(attr) == 0) {
1649 return true;
1652 a = ldb_schema_attribute_by_name(ldb, attr);
1653 if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1654 return true;
1656 return false;
1660 process an AND expression (intersection)
1662 static int ldb_kv_index_dn_and(struct ldb_module *module,
1663 struct ldb_kv_private *ldb_kv,
1664 const struct ldb_parse_tree *tree,
1665 struct dn_list *list)
1667 struct ldb_context *ldb;
1668 unsigned int i;
1669 bool found;
1671 ldb = ldb_module_get_ctx(module);
1673 list->dn = NULL;
1674 list->count = 0;
1676 /* in the first pass we only look for unique simple
1677 equality tests, in the hope of avoiding having to look
1678 at any others */
1679 for (i=0; i<tree->u.list.num_elements; i++) {
1680 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1681 int ret;
1683 if (subtree->operation != LDB_OP_EQUALITY ||
1684 !ldb_kv_index_unique(
1685 ldb, ldb_kv, subtree->u.equality.attr)) {
1686 continue;
1689 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1690 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1691 /* 0 && X == 0 */
1692 return LDB_ERR_NO_SUCH_OBJECT;
1694 if (ret == LDB_SUCCESS) {
1695 /* a unique index match means we can
1696 * stop. Note that we don't care if we return
1697 * a few too many objects, due to later
1698 * filtering */
1699 return LDB_SUCCESS;
1703 /* now do a full intersection */
1704 found = false;
1706 for (i=0; i<tree->u.list.num_elements; i++) {
1707 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1708 struct dn_list *list2;
1709 int ret;
1711 list2 = talloc_zero(list, struct dn_list);
1712 if (list2 == NULL) {
1713 return ldb_module_oom(module);
1716 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1718 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1719 /* X && 0 == 0 */
1720 list->dn = NULL;
1721 list->count = 0;
1722 talloc_free(list2);
1723 return LDB_ERR_NO_SUCH_OBJECT;
1726 if (ret != LDB_SUCCESS) {
1727 /* this didn't adding anything */
1728 talloc_free(list2);
1729 continue;
1732 if (!found) {
1733 talloc_reparent(list2, list, list->dn);
1734 list->dn = list2->dn;
1735 list->count = list2->count;
1736 found = true;
1737 } else if (!list_intersect(ldb_kv, list, list2)) {
1738 talloc_free(list2);
1739 return LDB_ERR_OPERATIONS_ERROR;
1742 if (list->count == 0) {
1743 list->dn = NULL;
1744 return LDB_ERR_NO_SUCH_OBJECT;
1747 if (list->count < 2) {
1748 /* it isn't worth loading the next part of the tree */
1749 return LDB_SUCCESS;
1753 if (!found) {
1754 /* none of the attributes were indexed */
1755 return LDB_ERR_OPERATIONS_ERROR;
1758 return LDB_SUCCESS;
1761 struct ldb_kv_ordered_index_context {
1762 struct ldb_module *module;
1763 int error;
1764 struct dn_list *dn_list;
1767 static int traverse_range_index(_UNUSED_ struct ldb_kv_private *ldb_kv,
1768 _UNUSED_ struct ldb_val key,
1769 struct ldb_val data,
1770 void *state)
1773 struct ldb_context *ldb;
1774 struct ldb_kv_ordered_index_context *ctx =
1775 (struct ldb_kv_ordered_index_context *)state;
1776 struct ldb_module *module = ctx->module;
1777 struct ldb_message_element *el = NULL;
1778 struct ldb_message *msg = NULL;
1779 int version;
1780 size_t dn_array_size, additional_length;
1781 unsigned int i;
1783 ldb = ldb_module_get_ctx(module);
1785 msg = ldb_msg_new(module);
1787 ctx->error = ldb_unpack_data_flags(ldb, &data, msg,
1788 LDB_UNPACK_DATA_FLAG_NO_DN);
1790 if (ctx->error != LDB_SUCCESS) {
1791 talloc_free(msg);
1792 return ctx->error;
1795 el = ldb_msg_find_element(msg, LDB_KV_IDX);
1796 if (!el) {
1797 talloc_free(msg);
1798 return LDB_SUCCESS;
1801 version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
1804 * we avoid copying the strings by stealing the list. We have
1805 * to steal msg onto el->values (which looks odd) because
1806 * the memory is allocated on msg, not on each value.
1808 if (version != LDB_KV_GUID_INDEXING_VERSION) {
1809 /* This is quite likely during the DB startup
1810 on first upgrade to using a GUID index */
1811 ldb_debug_set(ldb_module_get_ctx(module),
1812 LDB_DEBUG_ERROR, __location__
1813 ": Wrong GUID index version %d expected %d",
1814 version, LDB_KV_GUID_INDEXING_VERSION);
1815 talloc_free(msg);
1816 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1817 return ctx->error;
1820 if (el->num_values == 0) {
1821 talloc_free(msg);
1822 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1823 return ctx->error;
1826 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
1827 || el->values[0].length == 0) {
1828 talloc_free(msg);
1829 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1830 return ctx->error;
1833 dn_array_size = talloc_array_length(ctx->dn_list->dn);
1835 additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
1837 if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
1838 talloc_free(msg);
1839 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1840 return ctx->error;
1843 if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
1844 size_t new_array_length;
1846 if (dn_array_size * 2 < dn_array_size) {
1847 talloc_free(msg);
1848 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1849 return ctx->error;
1852 new_array_length = MAX(ctx->dn_list->count + additional_length,
1853 dn_array_size * 2);
1855 ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
1856 ctx->dn_list->dn,
1857 struct ldb_val,
1858 new_array_length);
1861 if (ctx->dn_list->dn == NULL) {
1862 talloc_free(msg);
1863 ctx->error = LDB_ERR_OPERATIONS_ERROR;
1864 return ctx->error;
1868 * The actual data is on msg.
1870 talloc_steal(ctx->dn_list->dn, msg);
1871 for (i = 0; i < additional_length; i++) {
1872 ctx->dn_list->dn[i + ctx->dn_list->count].data
1873 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
1874 ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
1878 ctx->dn_list->count += additional_length;
1880 talloc_free(msg->elements);
1882 return LDB_SUCCESS;
1886 * >= and <= indexing implemented using lexicographically sorted keys
1888 * We only run this in GUID indexing mode and when there is no write
1889 * transaction (only implicit read locks are being held). Otherwise, we would
1890 * have to deal with the in-memory index cache.
1892 * We rely on the implementation of index_format_fn on a schema syntax which
1893 * will can help us to construct keys which can be ordered correctly, and we
1894 * terminate using schema agnostic start and end keys.
1896 * index_format_fn must output values which can be memcmp-able to produce the
1897 * correct ordering as defined by the schema syntax class.
1899 static int ldb_kv_index_dn_ordered(struct ldb_module *module,
1900 struct ldb_kv_private *ldb_kv,
1901 const struct ldb_parse_tree *tree,
1902 struct dn_list *list, bool ascending)
1904 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1905 struct ldb_context *ldb = ldb_module_get_ctx(module);
1907 struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
1908 struct ldb_val start_key, end_key;
1909 struct ldb_dn *key_dn = NULL;
1910 const struct ldb_schema_attribute *a = NULL;
1912 struct ldb_kv_ordered_index_context ctx;
1913 int ret;
1915 TALLOC_CTX *tmp_ctx = NULL;
1917 if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
1918 return LDB_ERR_OPERATIONS_ERROR;
1921 if (ldb_kv->cache->GUID_index_attribute == NULL) {
1922 return LDB_ERR_OPERATIONS_ERROR;
1925 /* bail out if we're in a transaction, full search instead. */
1926 if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1927 return LDB_ERR_OPERATIONS_ERROR;
1930 if (ldb_kv->disallow_dn_filter &&
1931 (ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
1932 /* in AD mode we do not support "(dn=...)" search filters */
1933 list->dn = NULL;
1934 list->count = 0;
1935 return LDB_SUCCESS;
1937 if (tree->u.comparison.attr[0] == '@') {
1938 /* Do not allow a indexed search against an @ */
1939 list->dn = NULL;
1940 list->count = 0;
1941 return LDB_SUCCESS;
1944 a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
1947 * If there's no index format function defined for this attr, then
1948 * the lexicographic order in the database doesn't correspond to the
1949 * attr's ordering, so we can't use the iterate_range op.
1951 if (a->syntax->index_format_fn == NULL) {
1952 return LDB_ERR_OPERATIONS_ERROR;
1955 tmp_ctx = talloc_new(NULL);
1956 if (tmp_ctx == NULL) {
1957 return ldb_module_oom(module);
1960 key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1961 &tree->u.comparison.value,
1962 NULL, &truncation);
1963 if (!key_dn) {
1964 TALLOC_FREE(tmp_ctx);
1965 return LDB_ERR_OPERATIONS_ERROR;
1966 } else if (truncation == KEY_TRUNCATED) {
1967 ldb_debug(ldb, LDB_DEBUG_WARNING,
1968 __location__
1969 ": ordered index violation: key dn truncated: %s\n",
1970 ldb_dn_get_linearized(key_dn));
1971 TALLOC_FREE(tmp_ctx);
1972 return LDB_ERR_OPERATIONS_ERROR;
1974 ldb_key = ldb_kv_key_dn(tmp_ctx, key_dn);
1975 talloc_free(key_dn);
1976 if (ldb_key.data == NULL) {
1977 TALLOC_FREE(tmp_ctx);
1978 return LDB_ERR_OPERATIONS_ERROR;
1981 key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
1982 NULL, NULL, &truncation);
1983 if (!key_dn) {
1984 TALLOC_FREE(tmp_ctx);
1985 return LDB_ERR_OPERATIONS_ERROR;
1986 } else if (truncation == KEY_TRUNCATED) {
1987 ldb_debug(ldb, LDB_DEBUG_WARNING,
1988 __location__
1989 ": ordered index violation: key dn truncated: %s\n",
1990 ldb_dn_get_linearized(key_dn));
1991 TALLOC_FREE(tmp_ctx);
1992 return LDB_ERR_OPERATIONS_ERROR;
1995 ldb_key2 = ldb_kv_key_dn(tmp_ctx, key_dn);
1996 talloc_free(key_dn);
1997 if (ldb_key2.data == NULL) {
1998 TALLOC_FREE(tmp_ctx);
1999 return LDB_ERR_OPERATIONS_ERROR;
2003 * In order to avoid defining a start and end key for the search, we
2004 * notice that each index key is of the form:
2006 * DN=@INDEX:<ATTRIBUTE>:<VALUE>\0.
2008 * We can simply make our start key DN=@INDEX:<ATTRIBUTE>: and our end
2009 * key DN=@INDEX:<ATTRIBUTE>; to return all index entries for a
2010 * particular attribute.
2012 * Our LMDB backend uses the default memcmp for key comparison.
2015 /* Eliminate NUL byte at the end of the empty key */
2016 ldb_key2.length--;
2018 if (ascending) {
2019 /* : becomes ; for pseudo end-key */
2020 ldb_key2.data[ldb_key2.length-1]++;
2021 start_key = ldb_key;
2022 end_key = ldb_key2;
2023 } else {
2024 start_key = ldb_key2;
2025 end_key = ldb_key;
2028 ctx.module = module;
2029 ctx.error = 0;
2030 ctx.dn_list = list;
2031 ctx.dn_list->count = 0;
2032 ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
2034 ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
2035 traverse_range_index, &ctx);
2037 if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
2038 TALLOC_FREE(tmp_ctx);
2039 return LDB_ERR_OPERATIONS_ERROR;
2042 TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
2043 ldb_val_equal_exact_for_qsort);
2045 TALLOC_FREE(tmp_ctx);
2047 return LDB_SUCCESS;
2050 static int ldb_kv_index_dn_greater(struct ldb_module *module,
2051 struct ldb_kv_private *ldb_kv,
2052 const struct ldb_parse_tree *tree,
2053 struct dn_list *list)
2055 return ldb_kv_index_dn_ordered(module,
2056 ldb_kv,
2057 tree,
2058 list, true);
2061 static int ldb_kv_index_dn_less(struct ldb_module *module,
2062 struct ldb_kv_private *ldb_kv,
2063 const struct ldb_parse_tree *tree,
2064 struct dn_list *list)
2066 return ldb_kv_index_dn_ordered(module,
2067 ldb_kv,
2068 tree,
2069 list, false);
2073 return a list of matching objects using a one-level index
2075 static int ldb_kv_index_dn_attr(struct ldb_module *module,
2076 struct ldb_kv_private *ldb_kv,
2077 const char *attr,
2078 struct ldb_dn *dn,
2079 struct dn_list *list,
2080 enum key_truncation *truncation)
2082 struct ldb_context *ldb;
2083 struct ldb_dn *key;
2084 struct ldb_val val;
2085 int ret;
2087 ldb = ldb_module_get_ctx(module);
2089 /* work out the index key from the parent DN */
2090 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2091 if (val.data == NULL) {
2092 const char *dn_str = ldb_dn_get_linearized(dn);
2093 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2094 __location__
2095 ": Failed to get casefold DN "
2096 "from: %s",
2097 dn_str);
2098 return LDB_ERR_OPERATIONS_ERROR;
2100 val.length = strlen((char *)val.data);
2101 key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
2102 if (!key) {
2103 ldb_oom(ldb);
2104 return LDB_ERR_OPERATIONS_ERROR;
2107 ret = ldb_kv_dn_list_load(module, ldb_kv, key, list,
2108 DN_LIST_WILL_BE_READ_ONLY);
2109 talloc_free(key);
2110 if (ret != LDB_SUCCESS) {
2111 return ret;
2114 if (list->count == 0) {
2115 return LDB_ERR_NO_SUCH_OBJECT;
2118 return LDB_SUCCESS;
2122 return a list of matching objects using a one-level index
2124 static int ldb_kv_index_dn_one(struct ldb_module *module,
2125 struct ldb_kv_private *ldb_kv,
2126 struct ldb_dn *parent_dn,
2127 struct dn_list *list,
2128 enum key_truncation *truncation)
2130 int ret = ldb_kv_index_dn_attr(
2131 module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
2132 if (ret == LDB_SUCCESS) {
2134 * Ensure we do not shortcut on intersection for this
2135 * list. We must never be lazy and return an entry
2136 * not in this list. This allows the index for
2137 * SCOPE_ONELEVEL to be trusted.
2140 list->strict = true;
2142 return ret;
2146 return a list of matching objects using the DN index
2148 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
2149 struct ldb_kv_private *ldb_kv,
2150 struct ldb_dn *base_dn,
2151 struct dn_list *dn_list,
2152 enum key_truncation *truncation)
2154 const struct ldb_val *guid_val = NULL;
2155 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2156 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2157 if (dn_list->dn == NULL) {
2158 return ldb_module_oom(module);
2160 dn_list->dn[0].data = discard_const_p(unsigned char,
2161 ldb_dn_get_linearized(base_dn));
2162 if (dn_list->dn[0].data == NULL) {
2163 talloc_free(dn_list->dn);
2164 return ldb_module_oom(module);
2166 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
2167 dn_list->count = 1;
2169 return LDB_SUCCESS;
2172 if (ldb_kv->cache->GUID_index_dn_component != NULL) {
2173 guid_val = ldb_dn_get_extended_component(
2174 base_dn, ldb_kv->cache->GUID_index_dn_component);
2177 if (guid_val != NULL) {
2178 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
2179 if (dn_list->dn == NULL) {
2180 return ldb_module_oom(module);
2182 dn_list->dn[0].data = guid_val->data;
2183 dn_list->dn[0].length = guid_val->length;
2184 dn_list->count = 1;
2186 return LDB_SUCCESS;
2189 return ldb_kv_index_dn_attr(
2190 module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
2194 return a list of dn's that might match a indexed search or
2195 an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
2197 static int ldb_kv_index_dn(struct ldb_module *module,
2198 struct ldb_kv_private *ldb_kv,
2199 const struct ldb_parse_tree *tree,
2200 struct dn_list *list)
2202 int ret = LDB_ERR_OPERATIONS_ERROR;
2204 switch (tree->operation) {
2205 case LDB_OP_AND:
2206 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
2207 break;
2209 case LDB_OP_OR:
2210 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
2211 break;
2213 case LDB_OP_NOT:
2214 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
2215 break;
2217 case LDB_OP_EQUALITY:
2218 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
2219 break;
2221 case LDB_OP_GREATER:
2222 ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
2223 break;
2225 case LDB_OP_LESS:
2226 ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
2227 break;
2229 case LDB_OP_SUBSTRING:
2230 case LDB_OP_PRESENT:
2231 case LDB_OP_APPROX:
2232 case LDB_OP_EXTENDED:
2233 /* we can't index with fancy bitops yet */
2234 ret = LDB_ERR_OPERATIONS_ERROR;
2235 break;
2238 return ret;
2242 filter a candidate dn_list from an indexed search into a set of results
2243 extracting just the given attributes
2245 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
2246 const struct dn_list *dn_list,
2247 struct ldb_kv_context *ac,
2248 uint32_t *match_count,
2249 enum key_truncation scope_one_truncation)
2251 struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2252 struct ldb_message *msg;
2253 struct ldb_message *filtered_msg;
2254 unsigned int i;
2255 unsigned int num_keys = 0;
2256 uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
2257 struct ldb_val *keys = NULL;
2260 * We have to allocate the key list (rather than just walk the
2261 * caller supplied list) as the callback could change the list
2262 * (by modifying an indexed attribute hosted in the in-memory
2263 * index cache!)
2265 keys = talloc_array(ac, struct ldb_val, dn_list->count);
2266 if (keys == NULL) {
2267 return ldb_module_oom(ac->module);
2270 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2272 * We speculate that the keys will be GUID based and so
2273 * pre-fill in enough space for a GUID (avoiding a pile of
2274 * small allocations)
2276 struct guid_tdb_key {
2277 uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2278 } *key_values = NULL;
2280 key_values = talloc_array(keys,
2281 struct guid_tdb_key,
2282 dn_list->count);
2284 if (key_values == NULL) {
2285 talloc_free(keys);
2286 return ldb_module_oom(ac->module);
2288 for (i = 0; i < dn_list->count; i++) {
2289 keys[i].data = key_values[i].guid_key;
2290 keys[i].length = sizeof(key_values[i].guid_key);
2292 } else {
2293 for (i = 0; i < dn_list->count; i++) {
2294 keys[i].data = NULL;
2295 keys[i].length = 0;
2299 for (i = 0; i < dn_list->count; i++) {
2300 int ret;
2302 ret = ldb_kv_idx_to_key(
2303 ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
2304 if (ret != LDB_SUCCESS) {
2305 talloc_free(keys);
2306 return ret;
2309 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2311 * If we are in GUID index mode, then the dn_list is
2312 * sorted. If we got a duplicate, forget about it, as
2313 * otherwise we would send the same entry back more
2314 * than once.
2316 * This is needed in the truncated DN case, or if a
2317 * duplicate was forced in via
2318 * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2321 if (memcmp(previous_guid_key,
2322 keys[num_keys].data,
2323 sizeof(previous_guid_key)) == 0) {
2324 continue;
2327 memcpy(previous_guid_key,
2328 keys[num_keys].data,
2329 sizeof(previous_guid_key));
2331 num_keys++;
2336 * Now that the list is a safe copy, send the callbacks
2338 for (i = 0; i < num_keys; i++) {
2339 int ret;
2340 bool matched;
2341 msg = ldb_msg_new(ac);
2342 if (!msg) {
2343 talloc_free(keys);
2344 return LDB_ERR_OPERATIONS_ERROR;
2347 ret =
2348 ldb_kv_search_key(ac->module,
2349 ldb_kv,
2350 keys[i],
2351 msg,
2352 LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC |
2354 * The entry point ldb_kv_search_indexed is
2355 * only called from the read-locked
2356 * ldb_kv_search.
2358 LDB_UNPACK_DATA_FLAG_READ_LOCKED);
2359 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2361 * the record has disappeared? yes, this can
2362 * happen if the entry is deleted by something
2363 * operating in the callback (not another
2364 * process, as we have a read lock)
2366 talloc_free(msg);
2367 continue;
2370 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2371 /* an internal error */
2372 talloc_free(keys);
2373 talloc_free(msg);
2374 return LDB_ERR_OPERATIONS_ERROR;
2378 * We trust the index for LDB_SCOPE_ONELEVEL
2379 * unless the index key has been truncated.
2381 * LDB_SCOPE_BASE is not passed in by our only caller.
2383 if (ac->scope == LDB_SCOPE_ONELEVEL &&
2384 ldb_kv->cache->one_level_indexes &&
2385 scope_one_truncation == KEY_NOT_TRUNCATED) {
2386 ret = ldb_match_message(ldb, msg, ac->tree,
2387 ac->scope, &matched);
2388 } else {
2389 ret = ldb_match_msg_error(ldb, msg,
2390 ac->tree, ac->base,
2391 ac->scope, &matched);
2394 if (ret != LDB_SUCCESS) {
2395 talloc_free(keys);
2396 talloc_free(msg);
2397 return ret;
2399 if (!matched) {
2400 talloc_free(msg);
2401 continue;
2404 filtered_msg = ldb_msg_new(ac);
2405 if (filtered_msg == NULL) {
2406 TALLOC_FREE(keys);
2407 TALLOC_FREE(msg);
2408 return LDB_ERR_OPERATIONS_ERROR;
2411 filtered_msg->dn = talloc_steal(filtered_msg, msg->dn);
2413 /* filter the attributes that the user wants */
2414 ret = ldb_kv_filter_attrs(ldb, msg, ac->attrs, filtered_msg);
2416 talloc_free(msg);
2418 if (ret == -1) {
2419 TALLOC_FREE(filtered_msg);
2420 talloc_free(keys);
2421 return LDB_ERR_OPERATIONS_ERROR;
2424 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
2425 if (ret != LDB_SUCCESS) {
2426 /* Regardless of success or failure, the msg
2427 * is the callbacks responsiblity, and should
2428 * not be talloc_free()'ed */
2429 ac->request_terminated = true;
2430 talloc_free(keys);
2431 return ret;
2434 (*match_count)++;
2437 TALLOC_FREE(keys);
2438 return LDB_SUCCESS;
2442 sort a DN list
2444 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
2445 struct dn_list *list)
2447 if (list->count < 2) {
2448 return;
2451 /* We know the list is sorted when using the GUID index */
2452 if (ltdb->cache->GUID_index_attribute != NULL) {
2453 return;
2456 TYPESAFE_QSORT(list->dn, list->count,
2457 ldb_val_equal_exact_for_qsort);
2461 search the database with a LDAP-like expression using indexes
2462 returns -1 if an indexed search is not possible, in which
2463 case the caller should call ltdb_search_full()
2465 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
2467 struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
2468 struct ldb_kv_private *ldb_kv = talloc_get_type(
2469 ldb_module_get_private(ac->module), struct ldb_kv_private);
2470 struct dn_list *dn_list;
2471 int ret;
2472 enum ldb_scope index_scope;
2473 enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
2475 /* see if indexing is enabled */
2476 if (!ldb_kv->cache->attribute_indexes &&
2477 !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
2478 /* fallback to a full search */
2479 return LDB_ERR_OPERATIONS_ERROR;
2482 dn_list = talloc_zero(ac, struct dn_list);
2483 if (dn_list == NULL) {
2484 return ldb_module_oom(ac->module);
2488 * For the purposes of selecting the switch arm below, if we
2489 * don't have a one-level index then treat it like a subtree
2490 * search
2492 if (ac->scope == LDB_SCOPE_ONELEVEL &&
2493 !ldb_kv->cache->one_level_indexes) {
2494 index_scope = LDB_SCOPE_SUBTREE;
2495 } else {
2496 index_scope = ac->scope;
2499 switch (index_scope) {
2500 case LDB_SCOPE_BASE:
2502 * The only caller will have filtered the operation out
2503 * so we should never get here
2505 return ldb_operr(ldb);
2507 case LDB_SCOPE_ONELEVEL:
2510 * First, load all the one-level child objects (regardless of
2511 * whether they match the search filter or not). The database
2512 * maintains a one-level index, so retrieving this is quick.
2514 ret = ldb_kv_index_dn_one(ac->module,
2515 ldb_kv,
2516 ac->base,
2517 dn_list,
2518 &scope_one_truncation);
2519 if (ret != LDB_SUCCESS) {
2520 talloc_free(dn_list);
2521 return ret;
2525 * If we have too many children, running ldb_kv_index_filter()
2526 * over all the child objects can be quite expensive. So next
2527 * we do a separate indexed query using the search filter.
2529 * This should be quick, but it may return objects that are not
2530 * the direct one-level child objects we're interested in.
2532 * We only do this in the GUID index mode, which is
2533 * O(n*log(m)) otherwise the intersection below will
2534 * be too costly at O(n*m).
2536 * We don't set a heuristic for 'too many' but instead
2537 * do it always and rely on the index lookup being
2538 * fast enough in the small case.
2540 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2541 struct dn_list *indexed_search_result
2542 = talloc_zero(ac, struct dn_list);
2543 if (indexed_search_result == NULL) {
2544 talloc_free(dn_list);
2545 return ldb_module_oom(ac->module);
2548 if (!ldb_kv->cache->attribute_indexes) {
2549 talloc_free(indexed_search_result);
2550 talloc_free(dn_list);
2551 return LDB_ERR_OPERATIONS_ERROR;
2555 * Try to do an indexed database search
2557 ret = ldb_kv_index_dn(
2558 ac->module, ldb_kv, ac->tree,
2559 indexed_search_result);
2562 * We can stop if we're sure the object doesn't exist
2564 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2565 talloc_free(indexed_search_result);
2566 talloc_free(dn_list);
2567 return LDB_ERR_NO_SUCH_OBJECT;
2571 * Once we have a successful search result, we
2572 * intersect it with the one-level children (dn_list).
2573 * This should give us exactly the result we're after
2574 * (we still need to run ldb_kv_index_filter() to
2575 * handle potential index truncation cases).
2577 * The indexed search may fail because we don't support
2578 * indexing on that type of search operation, e.g.
2579 * matching against '*'. In which case we fall through
2580 * and run ldb_kv_index_filter() over all the one-level
2581 * children (which is still better than bailing out here
2582 * and falling back to a full DB scan).
2584 if (ret == LDB_SUCCESS) {
2585 if (!list_intersect(ldb_kv,
2586 dn_list,
2587 indexed_search_result)) {
2588 talloc_free(indexed_search_result);
2589 talloc_free(dn_list);
2590 return LDB_ERR_OPERATIONS_ERROR;
2594 break;
2596 case LDB_SCOPE_SUBTREE:
2597 case LDB_SCOPE_DEFAULT:
2598 if (!ldb_kv->cache->attribute_indexes) {
2599 talloc_free(dn_list);
2600 return LDB_ERR_OPERATIONS_ERROR;
2603 * Here we load the index for the tree. We have no
2604 * index for the subtree.
2606 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2607 if (ret != LDB_SUCCESS) {
2608 talloc_free(dn_list);
2609 return ret;
2611 break;
2615 * It is critical that this function do the re-filter even
2616 * on things found by the index as the index can over-match
2617 * in cases of truncation (as well as when it decides it is
2618 * not worth further filtering)
2620 * If this changes, then the index code above would need to
2621 * pass up a flag to say if any index was truncated during
2622 * processing as the truncation here refers only to the
2623 * SCOPE_ONELEVEL index.
2625 ret = ldb_kv_index_filter(
2626 ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2627 talloc_free(dn_list);
2628 return ret;
2632 * @brief Add a DN in the index list of a given attribute name/value pair
2634 * This function will add the DN in the index list for the index for
2635 * the given attribute name and value.
2637 * @param[in] module A ldb_module structure
2639 * @param[in] dn The string representation of the DN as it
2640 * will be stored in the index entry
2642 * @param[in] el A ldb_message_element array, one of the entry
2643 * referred by the v_idx is the attribute name and
2644 * value pair which will be used to construct the
2645 * index name
2647 * @param[in] v_idx The index of element in the el array to use
2649 * @return An ldb error code
2651 static int ldb_kv_index_add1(struct ldb_module *module,
2652 struct ldb_kv_private *ldb_kv,
2653 const struct ldb_message *msg,
2654 struct ldb_message_element *el,
2655 int v_idx)
2657 struct ldb_context *ldb;
2658 struct ldb_dn *dn_key;
2659 int ret;
2660 const struct ldb_schema_attribute *a;
2661 struct dn_list *list;
2662 unsigned alloc_len;
2663 enum key_truncation truncation = KEY_TRUNCATED;
2666 ldb = ldb_module_get_ctx(module);
2668 list = talloc_zero(module, struct dn_list);
2669 if (list == NULL) {
2670 return LDB_ERR_OPERATIONS_ERROR;
2673 dn_key = ldb_kv_index_key(
2674 ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2675 if (!dn_key) {
2676 talloc_free(list);
2677 return LDB_ERR_OPERATIONS_ERROR;
2680 * Samba only maintains unique indexes on the objectSID and objectGUID
2681 * so if a unique index key exceeds the maximum length there is a
2682 * problem.
2684 if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2685 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2686 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2688 ldb_asprintf_errstring(
2689 ldb,
2690 __location__ ": unique index key on %s in %s, "
2691 "exceeds maximum key length of %u (encoded).",
2692 el->name,
2693 ldb_dn_get_linearized(msg->dn),
2694 ldb_kv->max_key_length);
2695 talloc_free(list);
2696 return LDB_ERR_CONSTRAINT_VIOLATION;
2698 talloc_steal(list, dn_key);
2700 ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
2701 DN_LIST_MUTABLE);
2702 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2703 talloc_free(list);
2704 return ret;
2708 * Check for duplicates in the @IDXDN DN -> GUID record
2710 * This is very normal, it just means a duplicate DN creation
2711 * was attempted, so don't set the error string or print scary
2712 * messages.
2714 if (list->count > 0 &&
2715 ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2716 truncation == KEY_NOT_TRUNCATED) {
2718 talloc_free(list);
2719 return LDB_ERR_CONSTRAINT_VIOLATION;
2721 } else if (list->count > 0
2722 && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2725 * At least one existing entry in the DN->GUID index, which
2726 * arises when the DN indexes have been truncated
2728 * So need to pull the DN's to check if it's really a duplicate
2730 unsigned int i;
2731 for (i=0; i < list->count; i++) {
2732 uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2733 struct ldb_val key = {
2734 .data = guid_key,
2735 .length = sizeof(guid_key)
2737 const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2738 struct ldb_message *rec = ldb_msg_new(ldb);
2739 if (rec == NULL) {
2740 return LDB_ERR_OPERATIONS_ERROR;
2743 ret = ldb_kv_idx_to_key(
2744 module, ldb_kv, ldb, &list->dn[i], &key);
2745 if (ret != LDB_SUCCESS) {
2746 TALLOC_FREE(list);
2747 TALLOC_FREE(rec);
2748 return ret;
2751 ret =
2752 ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2753 if (key.data != guid_key) {
2754 TALLOC_FREE(key.data);
2756 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2758 * the record has disappeared?
2759 * yes, this can happen
2761 talloc_free(rec);
2762 continue;
2765 if (ret != LDB_SUCCESS) {
2766 /* an internal error */
2767 TALLOC_FREE(rec);
2768 TALLOC_FREE(list);
2769 return LDB_ERR_OPERATIONS_ERROR;
2772 * The DN we are trying to add to the DB and index
2773 * is already here, so we must deny the addition
2775 if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2776 TALLOC_FREE(rec);
2777 TALLOC_FREE(list);
2778 return LDB_ERR_CONSTRAINT_VIOLATION;
2784 * Check for duplicates in unique indexes
2786 * We don't need to do a loop test like the @IDXDN case
2787 * above as we have a ban on long unique index values
2788 * at the start of this function.
2790 if (list->count > 0 &&
2791 ((a != NULL
2792 && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2793 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2795 * We do not want to print info about a possibly
2796 * confidential DN that the conflict was with in the
2797 * user-visible error string
2800 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2801 ldb_debug(ldb, LDB_DEBUG_WARNING,
2802 __location__
2803 ": unique index violation on %s in %s, "
2804 "conflicts with %*.*s in %s",
2805 el->name, ldb_dn_get_linearized(msg->dn),
2806 (int)list->dn[0].length,
2807 (int)list->dn[0].length,
2808 list->dn[0].data,
2809 ldb_dn_get_linearized(dn_key));
2810 } else {
2811 /* This can't fail, gives a default at worst */
2812 const struct ldb_schema_attribute *attr =
2813 ldb_schema_attribute_by_name(
2814 ldb, ldb_kv->cache->GUID_index_attribute);
2815 struct ldb_val v;
2816 ret = attr->syntax->ldif_write_fn(ldb, list,
2817 &list->dn[0], &v);
2818 if (ret == LDB_SUCCESS) {
2819 ldb_debug(ldb,
2820 LDB_DEBUG_WARNING,
2821 __location__
2822 ": unique index violation on %s in "
2823 "%s, conflicts with %s %*.*s in %s",
2824 el->name,
2825 ldb_dn_get_linearized(msg->dn),
2826 ldb_kv->cache->GUID_index_attribute,
2827 (int)v.length,
2828 (int)v.length,
2829 v.data,
2830 ldb_dn_get_linearized(dn_key));
2833 ldb_asprintf_errstring(ldb,
2834 __location__ ": unique index violation "
2835 "on %s in %s",
2836 el->name,
2837 ldb_dn_get_linearized(msg->dn));
2838 talloc_free(list);
2839 return LDB_ERR_CONSTRAINT_VIOLATION;
2842 /* overallocate the list a bit, to reduce the number of
2843 * realloc trigered copies */
2844 alloc_len = ((list->count+1)+7) & ~7;
2845 list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2846 if (list->dn == NULL) {
2847 talloc_free(list);
2848 return LDB_ERR_OPERATIONS_ERROR;
2851 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2852 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2853 list->dn[list->count].data
2854 = (uint8_t *)talloc_strdup(list->dn, dn_str);
2855 if (list->dn[list->count].data == NULL) {
2856 talloc_free(list);
2857 return LDB_ERR_OPERATIONS_ERROR;
2859 list->dn[list->count].length = strlen(dn_str);
2860 } else {
2861 const struct ldb_val *key_val;
2862 struct ldb_val *exact = NULL, *next = NULL;
2863 key_val = ldb_msg_find_ldb_val(
2864 msg, ldb_kv->cache->GUID_index_attribute);
2865 if (key_val == NULL) {
2866 talloc_free(list);
2867 return ldb_module_operr(module);
2870 if (key_val->length != LDB_KV_GUID_SIZE) {
2871 talloc_free(list);
2872 return ldb_module_operr(module);
2875 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2876 *key_val, ldb_val_equal_exact_ordered,
2877 exact, next);
2880 * Give a warning rather than fail, this could be a
2881 * duplicate value in the record allowed by a caller
2882 * forcing in the value with
2883 * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2885 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2886 /* This can't fail, gives a default at worst */
2887 const struct ldb_schema_attribute *attr =
2888 ldb_schema_attribute_by_name(
2889 ldb, ldb_kv->cache->GUID_index_attribute);
2890 struct ldb_val v;
2891 ret = attr->syntax->ldif_write_fn(ldb, list,
2892 exact, &v);
2893 if (ret == LDB_SUCCESS) {
2894 ldb_debug(ldb,
2895 LDB_DEBUG_WARNING,
2896 __location__
2897 ": duplicate attribute value in %s "
2898 "for index on %s, "
2899 "duplicate of %s %*.*s in %s",
2900 ldb_dn_get_linearized(msg->dn),
2901 el->name,
2902 ldb_kv->cache->GUID_index_attribute,
2903 (int)v.length,
2904 (int)v.length,
2905 v.data,
2906 ldb_dn_get_linearized(dn_key));
2910 if (next == NULL) {
2911 next = &list->dn[list->count];
2912 } else {
2913 memmove(&next[1], next,
2914 sizeof(*next) * (list->count - (next - list->dn)));
2916 *next = ldb_val_dup(list->dn, key_val);
2917 if (next->data == NULL) {
2918 talloc_free(list);
2919 return ldb_module_operr(module);
2922 list->count++;
2924 ret = ldb_kv_dn_list_store(module, dn_key, list);
2926 talloc_free(list);
2928 return ret;
2932 add index entries for one elements in a message
2934 static int ldb_kv_index_add_el(struct ldb_module *module,
2935 struct ldb_kv_private *ldb_kv,
2936 const struct ldb_message *msg,
2937 struct ldb_message_element *el)
2939 unsigned int i;
2940 for (i = 0; i < el->num_values; i++) {
2941 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2942 if (ret != LDB_SUCCESS) {
2943 return ret;
2947 return LDB_SUCCESS;
2951 add index entries for all elements in a message
2953 static int ldb_kv_index_add_all(struct ldb_module *module,
2954 struct ldb_kv_private *ldb_kv,
2955 const struct ldb_message *msg)
2957 struct ldb_message_element *elements = msg->elements;
2958 unsigned int i;
2959 const char *dn_str;
2960 int ret;
2962 if (ldb_dn_is_special(msg->dn)) {
2963 return LDB_SUCCESS;
2966 dn_str = ldb_dn_get_linearized(msg->dn);
2967 if (dn_str == NULL) {
2968 return LDB_ERR_OPERATIONS_ERROR;
2971 ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2972 if (ret != LDB_SUCCESS) {
2973 return ret;
2976 if (!ldb_kv->cache->attribute_indexes) {
2977 /* no indexed fields */
2978 return LDB_SUCCESS;
2981 for (i = 0; i < msg->num_elements; i++) {
2982 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2983 continue;
2985 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2986 if (ret != LDB_SUCCESS) {
2987 struct ldb_context *ldb = ldb_module_get_ctx(module);
2988 ldb_asprintf_errstring(ldb,
2989 __location__ ": Failed to re-index %s in %s - %s",
2990 elements[i].name, dn_str,
2991 ldb_errstring(ldb));
2992 return ret;
2996 return LDB_SUCCESS;
3001 insert a DN index for a message
3003 static int ldb_kv_modify_index_dn(struct ldb_module *module,
3004 struct ldb_kv_private *ldb_kv,
3005 const struct ldb_message *msg,
3006 struct ldb_dn *dn,
3007 const char *index,
3008 int add)
3010 struct ldb_message_element el;
3011 struct ldb_val val;
3012 int ret;
3014 val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
3015 if (val.data == NULL) {
3016 const char *dn_str = ldb_dn_get_linearized(dn);
3017 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3018 __location__ ": Failed to modify %s "
3019 "against %s in %s: failed "
3020 "to get casefold DN",
3021 index,
3022 ldb_kv->cache->GUID_index_attribute,
3023 dn_str);
3024 return LDB_ERR_OPERATIONS_ERROR;
3027 val.length = strlen((char *)val.data);
3028 el.name = index;
3029 el.values = &val;
3030 el.num_values = 1;
3032 if (add) {
3033 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
3034 } else { /* delete */
3035 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
3038 if (ret != LDB_SUCCESS) {
3039 struct ldb_context *ldb = ldb_module_get_ctx(module);
3040 const char *dn_str = ldb_dn_get_linearized(dn);
3041 ldb_asprintf_errstring(ldb,
3042 __location__ ": Failed to modify %s "
3043 "against %s in %s - %s",
3044 index,
3045 ldb_kv->cache->GUID_index_attribute,
3046 dn_str,
3047 ldb_errstring(ldb));
3048 return ret;
3050 return ret;
3054 insert a one level index for a message
3056 static int ldb_kv_index_onelevel(struct ldb_module *module,
3057 const struct ldb_message *msg,
3058 int add)
3060 struct ldb_kv_private *ldb_kv = talloc_get_type(
3061 ldb_module_get_private(module), struct ldb_kv_private);
3062 struct ldb_dn *pdn;
3063 int ret;
3065 /* We index for ONE Level only if requested */
3066 if (!ldb_kv->cache->one_level_indexes) {
3067 return LDB_SUCCESS;
3070 pdn = ldb_dn_get_parent(module, msg->dn);
3071 if (pdn == NULL) {
3072 return LDB_ERR_OPERATIONS_ERROR;
3074 ret =
3075 ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
3077 talloc_free(pdn);
3079 return ret;
3083 insert a one level index for a message
3085 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
3086 const struct ldb_message *msg,
3087 int add)
3089 int ret;
3090 struct ldb_kv_private *ldb_kv = talloc_get_type(
3091 ldb_module_get_private(module), struct ldb_kv_private);
3093 /* We index for DN only if using a GUID index */
3094 if (ldb_kv->cache->GUID_index_attribute == NULL) {
3095 return LDB_SUCCESS;
3098 ret = ldb_kv_modify_index_dn(
3099 module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
3101 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
3102 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3103 "Entry %s already exists",
3104 ldb_dn_get_linearized(msg->dn));
3105 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
3107 return ret;
3111 add the index entries for a new element in a record
3112 The caller guarantees that these element values are not yet indexed
3114 int ldb_kv_index_add_element(struct ldb_module *module,
3115 struct ldb_kv_private *ldb_kv,
3116 const struct ldb_message *msg,
3117 struct ldb_message_element *el)
3119 if (ldb_dn_is_special(msg->dn)) {
3120 return LDB_SUCCESS;
3122 if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3123 return LDB_SUCCESS;
3125 return ldb_kv_index_add_el(module, ldb_kv, msg, el);
3129 add the index entries for a new record
3131 int ldb_kv_index_add_new(struct ldb_module *module,
3132 struct ldb_kv_private *ldb_kv,
3133 const struct ldb_message *msg)
3135 int ret;
3137 if (ldb_dn_is_special(msg->dn)) {
3138 return LDB_SUCCESS;
3141 ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3142 if (ret != LDB_SUCCESS) {
3144 * Because we can't trust the caller to be doing
3145 * transactions properly, clean up any index for this
3146 * entry rather than relying on a transaction
3147 * cleanup
3150 ldb_kv_index_delete(module, msg);
3151 return ret;
3154 ret = ldb_kv_index_onelevel(module, msg, 1);
3155 if (ret != LDB_SUCCESS) {
3157 * Because we can't trust the caller to be doing
3158 * transactions properly, clean up any index for this
3159 * entry rather than relying on a transaction
3160 * cleanup
3162 ldb_kv_index_delete(module, msg);
3163 return ret;
3165 return ret;
3170 delete an index entry for one message element
3172 int ldb_kv_index_del_value(struct ldb_module *module,
3173 struct ldb_kv_private *ldb_kv,
3174 const struct ldb_message *msg,
3175 struct ldb_message_element *el,
3176 unsigned int v_idx)
3178 struct ldb_context *ldb;
3179 struct ldb_dn *dn_key;
3180 const char *dn_str;
3181 int ret, i;
3182 unsigned int j;
3183 struct dn_list *list;
3184 struct ldb_dn *dn = msg->dn;
3185 enum key_truncation truncation = KEY_NOT_TRUNCATED;
3187 ldb = ldb_module_get_ctx(module);
3189 dn_str = ldb_dn_get_linearized(dn);
3190 if (dn_str == NULL) {
3191 return LDB_ERR_OPERATIONS_ERROR;
3194 if (dn_str[0] == '@') {
3195 return LDB_SUCCESS;
3198 dn_key = ldb_kv_index_key(
3199 ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
3201 * We ignore key truncation in ltdb_index_add1() so
3202 * match that by ignoring it here as well
3204 * Multiple values are legitimate and accepted
3206 if (!dn_key) {
3207 return LDB_ERR_OPERATIONS_ERROR;
3210 list = talloc_zero(dn_key, struct dn_list);
3211 if (list == NULL) {
3212 talloc_free(dn_key);
3213 return LDB_ERR_OPERATIONS_ERROR;
3216 ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list,
3217 DN_LIST_MUTABLE);
3218 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
3219 /* it wasn't indexed. Did we have an earlier error? If we did then
3220 its gone now */
3221 talloc_free(dn_key);
3222 return LDB_SUCCESS;
3225 if (ret != LDB_SUCCESS) {
3226 talloc_free(dn_key);
3227 return ret;
3231 * Find one of the values matching this message to remove
3233 i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
3234 if (i == -1) {
3235 /* nothing to delete */
3236 talloc_free(dn_key);
3237 return LDB_SUCCESS;
3240 j = (unsigned int) i;
3241 if (j != list->count - 1) {
3242 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
3244 list->count--;
3245 if (list->count == 0) {
3246 talloc_free(list->dn);
3247 list->dn = NULL;
3248 } else {
3249 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
3252 ret = ldb_kv_dn_list_store(module, dn_key, list);
3254 talloc_free(dn_key);
3256 return ret;
3260 delete the index entries for a element
3261 return -1 on failure
3263 int ldb_kv_index_del_element(struct ldb_module *module,
3264 struct ldb_kv_private *ldb_kv,
3265 const struct ldb_message *msg,
3266 struct ldb_message_element *el)
3268 const char *dn_str;
3269 int ret;
3270 unsigned int i;
3272 if (!ldb_kv->cache->attribute_indexes) {
3273 /* no indexed fields */
3274 return LDB_SUCCESS;
3277 dn_str = ldb_dn_get_linearized(msg->dn);
3278 if (dn_str == NULL) {
3279 return LDB_ERR_OPERATIONS_ERROR;
3282 if (dn_str[0] == '@') {
3283 return LDB_SUCCESS;
3286 if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
3287 return LDB_SUCCESS;
3289 for (i = 0; i < el->num_values; i++) {
3290 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
3291 if (ret != LDB_SUCCESS) {
3292 return ret;
3296 return LDB_SUCCESS;
3300 delete the index entries for a record
3301 return -1 on failure
3303 int ldb_kv_index_delete(struct ldb_module *module,
3304 const struct ldb_message *msg)
3306 struct ldb_kv_private *ldb_kv = talloc_get_type(
3307 ldb_module_get_private(module), struct ldb_kv_private);
3308 int ret;
3309 unsigned int i;
3311 if (ldb_dn_is_special(msg->dn)) {
3312 return LDB_SUCCESS;
3315 ret = ldb_kv_index_onelevel(module, msg, 0);
3316 if (ret != LDB_SUCCESS) {
3317 return ret;
3320 ret = ldb_kv_write_index_dn_guid(module, msg, 0);
3321 if (ret != LDB_SUCCESS) {
3322 return ret;
3325 if (!ldb_kv->cache->attribute_indexes) {
3326 /* no indexed fields */
3327 return LDB_SUCCESS;
3330 for (i = 0; i < msg->num_elements; i++) {
3331 ret = ldb_kv_index_del_element(
3332 module, ldb_kv, msg, &msg->elements[i]);
3333 if (ret != LDB_SUCCESS) {
3334 return ret;
3338 return LDB_SUCCESS;
3343 traversal function that deletes all @INDEX records in the in-memory
3344 TDB.
3346 This does not touch the actual DB, that is done at transaction
3347 commit, which in turn greatly reduces DB churn as we will likely
3348 be able to do a direct update into the old record.
3350 static int delete_index(struct ldb_kv_private *ldb_kv,
3351 struct ldb_val key,
3352 _UNUSED_ struct ldb_val data,
3353 void *state)
3355 struct ldb_module *module = state;
3356 const char *dnstr = "DN=" LDB_KV_INDEX ":";
3357 struct dn_list list;
3358 struct ldb_dn *dn;
3359 struct ldb_val v;
3360 int ret;
3362 if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
3363 return 0;
3365 /* we need to put a empty list in the internal tdb for this
3366 * index entry */
3367 list.dn = NULL;
3368 list.count = 0;
3370 /* the offset of 3 is to remove the DN= prefix. */
3371 v.data = key.data + 3;
3372 v.length = strnlen((char *)key.data, key.length) - 3;
3374 dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
3377 * This does not actually touch the DB quite yet, just
3378 * the in-memory index cache
3380 ret = ldb_kv_dn_list_store(module, dn, &list);
3381 if (ret != LDB_SUCCESS) {
3382 ldb_asprintf_errstring(ldb_module_get_ctx(module),
3383 "Unable to store null index for %s\n",
3384 ldb_dn_get_linearized(dn));
3385 talloc_free(dn);
3386 return -1;
3388 talloc_free(dn);
3389 return 0;
3393 traversal function that adds @INDEX records during a re index TODO wrong comment
3395 static int re_key(struct ldb_kv_private *ldb_kv,
3396 struct ldb_val key,
3397 struct ldb_val val,
3398 void *state)
3400 struct ldb_context *ldb;
3401 struct ldb_kv_reindex_context *ctx =
3402 (struct ldb_kv_reindex_context *)state;
3403 struct ldb_module *module = ldb_kv->module;
3404 struct ldb_message *msg;
3405 int ret;
3406 struct ldb_val key2;
3407 bool is_record;
3409 ldb = ldb_module_get_ctx(module);
3411 is_record = ldb_kv_key_is_normal_record(key);
3412 if (is_record == false) {
3413 return 0;
3416 msg = ldb_msg_new(module);
3417 if (msg == NULL) {
3418 return -1;
3421 ret = ldb_unpack_data(ldb, &val, msg);
3422 if (ret != 0) {
3423 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3424 ldb_dn_get_linearized(msg->dn));
3425 ctx->error = ret;
3426 talloc_free(msg);
3427 return -1;
3430 if (msg->dn == NULL) {
3431 ldb_debug(ldb, LDB_DEBUG_ERROR,
3432 "Refusing to re-index as GUID "
3433 "key %*.*s with no DN\n",
3434 (int)key.length, (int)key.length,
3435 (char *)key.data);
3436 talloc_free(msg);
3437 return -1;
3440 /* check if the DN key has changed, perhaps due to the case
3441 insensitivity of an element changing, or a change from DN
3442 to GUID keys */
3443 key2 = ldb_kv_key_msg(module, msg, msg);
3444 if (key2.data == NULL) {
3445 /* probably a corrupt record ... darn */
3446 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
3447 ldb_dn_get_linearized(msg->dn));
3448 talloc_free(msg);
3449 return 0;
3451 if (key.length != key2.length ||
3452 (memcmp(key.data, key2.data, key.length) != 0)) {
3453 ldb_kv->kv_ops->update_in_iterate(
3454 ldb_kv, key, key2, val, ctx);
3456 talloc_free(key2.data);
3458 talloc_free(msg);
3460 ctx->count++;
3461 if (ctx->count % 10000 == 0) {
3462 ldb_debug(ldb, LDB_DEBUG_WARNING,
3463 "Reindexing: re-keyed %u records so far",
3464 ctx->count);
3467 return 0;
3471 traversal function that adds @INDEX records during a re index
3473 static int re_index(struct ldb_kv_private *ldb_kv,
3474 struct ldb_val key,
3475 struct ldb_val val,
3476 void *state)
3478 struct ldb_context *ldb;
3479 struct ldb_kv_reindex_context *ctx =
3480 (struct ldb_kv_reindex_context *)state;
3481 struct ldb_module *module = ldb_kv->module;
3482 struct ldb_message *msg;
3483 int ret;
3484 bool is_record;
3486 ldb = ldb_module_get_ctx(module);
3488 is_record = ldb_kv_key_is_normal_record(key);
3489 if (is_record == false) {
3490 return 0;
3493 msg = ldb_msg_new(module);
3494 if (msg == NULL) {
3495 return -1;
3498 ret = ldb_unpack_data(ldb, &val, msg);
3499 if (ret != 0) {
3500 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3501 ldb_dn_get_linearized(msg->dn));
3502 ctx->error = ret;
3503 talloc_free(msg);
3504 return -1;
3507 if (msg->dn == NULL) {
3508 ldb_debug(ldb, LDB_DEBUG_ERROR,
3509 "Refusing to re-index as GUID "
3510 "key %*.*s with no DN\n",
3511 (int)key.length, (int)key.length,
3512 (char *)key.data);
3513 talloc_free(msg);
3514 return -1;
3517 ret = ldb_kv_index_onelevel(module, msg, 1);
3518 if (ret != LDB_SUCCESS) {
3519 ldb_debug(ldb, LDB_DEBUG_ERROR,
3520 "Adding special ONE LEVEL index failed (%s)!",
3521 ldb_dn_get_linearized(msg->dn));
3522 talloc_free(msg);
3523 return -1;
3526 ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3528 if (ret != LDB_SUCCESS) {
3529 ctx->error = ret;
3530 talloc_free(msg);
3531 return -1;
3534 talloc_free(msg);
3536 ctx->count++;
3537 if (ctx->count % 10000 == 0) {
3538 ldb_debug(ldb, LDB_DEBUG_WARNING,
3539 "Reindexing: re-indexed %u records so far",
3540 ctx->count);
3543 return 0;
3547 * Convert the 4-byte pack format version to a number that's slightly
3548 * more intelligible to a user e.g. version 0, 1, 2, etc.
3550 static uint32_t displayable_pack_version(uint32_t version) {
3551 if (version < LDB_PACKING_FORMAT_NODN) {
3552 return version; /* unknown - can't convert */
3555 return (version - LDB_PACKING_FORMAT_NODN);
3558 static int re_pack(struct ldb_kv_private *ldb_kv,
3559 _UNUSED_ struct ldb_val key,
3560 struct ldb_val val,
3561 void *state)
3563 struct ldb_context *ldb;
3564 struct ldb_message *msg;
3565 struct ldb_module *module = ldb_kv->module;
3566 struct ldb_kv_repack_context *ctx =
3567 (struct ldb_kv_repack_context *)state;
3568 int ret;
3570 ldb = ldb_module_get_ctx(module);
3572 msg = ldb_msg_new(module);
3573 if (msg == NULL) {
3574 return -1;
3577 ret = ldb_unpack_data(ldb, &val, msg);
3578 if (ret != 0) {
3579 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: unpack failed: %s\n",
3580 ldb_dn_get_linearized(msg->dn));
3581 ctx->error = ret;
3582 talloc_free(msg);
3583 return -1;
3586 ret = ldb_kv_store(module, msg, TDB_MODIFY);
3587 if (ret != LDB_SUCCESS) {
3588 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack: store failed: %s\n",
3589 ldb_dn_get_linearized(msg->dn));
3590 ctx->error = ret;
3591 talloc_free(msg);
3592 return -1;
3596 * Warn the user that we're repacking the first time we see a normal
3597 * record. This means we never warn if we're repacking a database with
3598 * only @ records. This is because during database initialisation,
3599 * we might repack as initial settings are written out, and we don't
3600 * want to spam the log.
3602 if ((!ctx->normal_record_seen) && (!ldb_dn_is_special(msg->dn))) {
3603 ldb_debug(ldb, LDB_DEBUG_ALWAYS_LOG,
3604 "Repacking database from v%u to v%u format "
3605 "(first record %s)",
3606 displayable_pack_version(ctx->old_version),
3607 displayable_pack_version(ldb_kv->pack_format_version),
3608 ldb_dn_get_linearized(msg->dn));
3609 ctx->normal_record_seen = true;
3612 ctx->count++;
3613 if (ctx->count % 10000 == 0) {
3614 ldb_debug(ldb, LDB_DEBUG_WARNING,
3615 "Repack: re-packed %u records so far",
3616 ctx->count);
3619 talloc_free(msg);
3620 return 0;
3623 int ldb_kv_repack(struct ldb_module *module)
3625 struct ldb_kv_private *ldb_kv = talloc_get_type(
3626 ldb_module_get_private(module), struct ldb_kv_private);
3627 struct ldb_context *ldb = ldb_module_get_ctx(module);
3628 struct ldb_kv_repack_context ctx;
3629 int ret;
3631 ctx.old_version = ldb_kv->pack_format_version;
3632 ctx.count = 0;
3633 ctx.error = LDB_SUCCESS;
3634 ctx.normal_record_seen = false;
3636 ldb_kv->pack_format_version = ldb_kv->target_pack_format_version;
3638 /* Iterate all database records and repack them in the new format */
3639 ret = ldb_kv->kv_ops->iterate(ldb_kv, re_pack, &ctx);
3640 if (ret < 0) {
3641 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack traverse failed: %s",
3642 ldb_errstring(ldb));
3643 return LDB_ERR_OPERATIONS_ERROR;
3646 if (ctx.error != LDB_SUCCESS) {
3647 ldb_debug(ldb, LDB_DEBUG_ERROR, "Repack failed: %s",
3648 ldb_errstring(ldb));
3649 return ctx.error;
3652 return LDB_SUCCESS;
3656 force a complete reindex of the database
3658 int ldb_kv_reindex(struct ldb_module *module)
3660 struct ldb_kv_private *ldb_kv = talloc_get_type(
3661 ldb_module_get_private(module), struct ldb_kv_private);
3662 int ret;
3663 struct ldb_kv_reindex_context ctx;
3664 size_t index_cache_size = 0;
3667 * Only triggered after a modification, but make clear we do
3668 * not re-index a read-only DB
3670 if (ldb_kv->read_only) {
3671 return LDB_ERR_UNWILLING_TO_PERFORM;
3674 if (ldb_kv_cache_reload(module) != 0) {
3675 return LDB_ERR_OPERATIONS_ERROR;
3679 * Ensure we read (and so remove) the entries from the real
3680 * DB, no values stored so far are any use as we want to do a
3681 * re-index
3683 ldb_kv_index_transaction_cancel(module);
3684 if (ldb_kv->nested_idx_ptr != NULL) {
3685 ldb_kv_index_sub_transaction_cancel(ldb_kv);
3689 * Calculate the size of the index cache needed for
3690 * the re-index. If specified always use the
3691 * ldb_kv->index_transaction_cache_size otherwise use the maximum
3692 * of the size estimate or the DEFAULT_INDEX_CACHE_SIZE
3694 if (ldb_kv->index_transaction_cache_size > 0) {
3695 index_cache_size = ldb_kv->index_transaction_cache_size;
3696 } else {
3697 index_cache_size = ldb_kv->kv_ops->get_size(ldb_kv);
3698 if (index_cache_size < DEFAULT_INDEX_CACHE_SIZE) {
3699 index_cache_size = DEFAULT_INDEX_CACHE_SIZE;
3704 * Note that we don't start an index sub transaction for re-indexing
3706 ret = ldb_kv_index_transaction_start(module, index_cache_size);
3707 if (ret != LDB_SUCCESS) {
3708 return ret;
3711 /* first traverse the database deleting any @INDEX records by
3712 * putting NULL entries in the in-memory tdb
3714 ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3715 if (ret < 0) {
3716 struct ldb_context *ldb = ldb_module_get_ctx(module);
3717 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3718 ldb_errstring(ldb));
3719 return LDB_ERR_OPERATIONS_ERROR;
3722 ctx.error = 0;
3723 ctx.count = 0;
3725 ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3726 if (ret < 0) {
3727 struct ldb_context *ldb = ldb_module_get_ctx(module);
3728 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3729 ldb_errstring(ldb));
3730 return LDB_ERR_OPERATIONS_ERROR;
3733 if (ctx.error != LDB_SUCCESS) {
3734 struct ldb_context *ldb = ldb_module_get_ctx(module);
3735 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3736 return ctx.error;
3739 ctx.error = 0;
3740 ctx.count = 0;
3742 /* now traverse adding any indexes for normal LDB records */
3743 ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3744 if (ret < 0) {
3745 struct ldb_context *ldb = ldb_module_get_ctx(module);
3746 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3747 ldb_errstring(ldb));
3748 return LDB_ERR_OPERATIONS_ERROR;
3751 if (ctx.error != LDB_SUCCESS) {
3752 struct ldb_context *ldb = ldb_module_get_ctx(module);
3753 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3754 return ctx.error;
3757 if (ctx.count > 10000) {
3758 ldb_debug(ldb_module_get_ctx(module),
3759 LDB_DEBUG_WARNING,
3760 "Reindexing: re_index successful on %s, "
3761 "final index write-out will be in transaction commit",
3762 ldb_kv->kv_ops->name(ldb_kv));
3764 return LDB_SUCCESS;
3768 * Copy the contents of the nested transaction index cache record to the
3769 * transaction index cache.
3771 * During this 'commit' of the subtransaction to the main transaction
3772 * (cache), care must be taken to free any existing index at the top
3773 * level because otherwise we would leak memory.
3775 static int ldb_kv_sub_transaction_traverse(
3776 struct tdb_context *tdb,
3777 TDB_DATA key,
3778 TDB_DATA data,
3779 void *state)
3781 struct ldb_module *module = state;
3782 struct ldb_kv_private *ldb_kv = talloc_get_type(
3783 ldb_module_get_private(module), struct ldb_kv_private);
3784 TDB_DATA rec = {0};
3785 struct dn_list *index_in_subtransaction = NULL;
3786 struct dn_list *index_in_top_level = NULL;
3787 int ret = 0;
3790 * This unwraps the pointer in the DB into a pointer in
3791 * memory, we are abusing TDB as a hash map, not a linearised
3792 * database store
3794 index_in_subtransaction = ldb_kv_index_idxptr(module, data);
3795 if (index_in_subtransaction == NULL) {
3796 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
3797 return -1;
3801 * Do we already have an entry in the primary transaction cache
3802 * If so free it's dn_list and replace it with the dn_list from
3803 * the secondary cache
3805 * The TDB and so the fetched rec contains NO DATA, just a
3806 * pointer to data held in memory.
3808 rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
3809 if (rec.dptr != NULL) {
3810 index_in_top_level = ldb_kv_index_idxptr(module, rec);
3811 free(rec.dptr);
3812 if (index_in_top_level == NULL) {
3813 abort();
3816 * We had this key at the top level. However we made a copy
3817 * at the sub-transaction level so that we could possibly
3818 * roll back. We have to free the top level index memory
3819 * otherwise we would leak
3821 if (index_in_top_level->count > 0) {
3822 TALLOC_FREE(index_in_top_level->dn);
3824 index_in_top_level->dn
3825 = talloc_steal(index_in_top_level,
3826 index_in_subtransaction->dn);
3827 index_in_top_level->count = index_in_subtransaction->count;
3828 return 0;
3831 index_in_top_level = talloc(ldb_kv->idxptr, struct dn_list);
3832 if (index_in_top_level == NULL) {
3833 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
3834 return -1;
3836 index_in_top_level->dn
3837 = talloc_steal(index_in_top_level,
3838 index_in_subtransaction->dn);
3839 index_in_top_level->count = index_in_subtransaction->count;
3841 rec.dptr = (uint8_t *)&index_in_top_level;
3842 rec.dsize = sizeof(void *);
3846 * This is not a store into the main DB, but into an in-memory
3847 * TDB, so we don't need a guard on ltdb->read_only
3849 ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
3850 if (ret != 0) {
3851 ldb_kv->idxptr->error = ltdb_err_map(
3852 tdb_error(ldb_kv->idxptr->itdb));
3853 return -1;
3855 return 0;
3859 * Initialise the index cache for a sub transaction.
3861 int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
3863 ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
3864 if (ldb_kv->nested_idx_ptr == NULL) {
3865 return LDB_ERR_OPERATIONS_ERROR;
3869 * We use a tiny hash size for the sub-database (11).
3871 * The sub-transaction is only for one record at a time, we
3872 * would use a linked list but that would make the code even
3873 * more complex when manipulating the index, as it would have
3874 * to know if we were in a nested transaction (normal
3875 * operations) or the top one (a reindex).
3877 ldb_kv->nested_idx_ptr->itdb =
3878 tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
3879 if (ldb_kv->nested_idx_ptr->itdb == NULL) {
3880 return LDB_ERR_OPERATIONS_ERROR;
3882 return LDB_SUCCESS;
3886 * Clear the contents of the nested transaction index cache when the nested
3887 * transaction is cancelled.
3889 int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
3891 if (ldb_kv->nested_idx_ptr != NULL) {
3892 tdb_close(ldb_kv->nested_idx_ptr->itdb);
3893 TALLOC_FREE(ldb_kv->nested_idx_ptr);
3895 return LDB_SUCCESS;
3899 * Commit a nested transaction,
3900 * Copy the contents of the nested transaction index cache to the
3901 * transaction index cache.
3903 int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
3905 int ret = 0;
3907 if (ldb_kv->nested_idx_ptr == NULL) {
3908 return LDB_SUCCESS;
3910 if (ldb_kv->nested_idx_ptr->itdb == NULL) {
3911 return LDB_SUCCESS;
3913 tdb_traverse(
3914 ldb_kv->nested_idx_ptr->itdb,
3915 ldb_kv_sub_transaction_traverse,
3916 ldb_kv->module);
3917 tdb_close(ldb_kv->nested_idx_ptr->itdb);
3918 ldb_kv->nested_idx_ptr->itdb = NULL;
3920 ret = ldb_kv->nested_idx_ptr->error;
3921 if (ret != LDB_SUCCESS) {
3922 struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
3923 if (!ldb_errstring(ldb)) {
3924 ldb_set_errstring(ldb, ldb_strerror(ret));
3926 ldb_asprintf_errstring(
3927 ldb,
3928 __location__": Failed to update index records in "
3929 "sub transaction commit: %s",
3930 ldb_errstring(ldb));
3932 TALLOC_FREE(ldb_kv->nested_idx_ptr);
3933 return ret;