s3-build: fix the build of vfs_notify_fam.
[Samba/ita.git] / source3 / lib / ldb / modules / paged_results.c
blobe16b7104131f8d9405622cb2b8d54fa20e8f2170
1 /*
2 ldb database library
4 Copyright (C) Simo Sorce 2005-2006
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
8 ** under the LGPL
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 3 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 * Name: paged_result
27 * Component: ldb paged results control module
29 * Description: this module caches a complete search and sends back
30 * results in chunks as asked by the client
32 * Author: Simo Sorce
35 #include "includes.h"
36 #include "ldb/include/includes.h"
38 struct message_store {
39 /* keep the whole ldb_reply as an optimization
40 * instead of freeing and talloc-ing the container
41 * on each result */
42 struct ldb_reply *r;
43 struct message_store *next;
46 struct private_data;
48 struct results_store {
50 struct private_data *priv;
52 char *cookie;
53 time_t timestamp;
55 struct results_store *prev;
56 struct results_store *next;
58 struct message_store *first;
59 struct message_store *last;
60 int num_entries;
62 struct message_store *first_ref;
63 struct message_store *last_ref;
65 struct ldb_control **controls;
67 struct ldb_request *req;
70 struct private_data {
72 int next_free_id;
73 struct results_store *store;
77 int store_destructor(struct results_store *store);
79 int store_destructor(struct results_store *store)
81 DLIST_REMOVE(store->priv->store, store);
82 return 0;
85 static struct results_store *new_store(struct private_data *priv)
87 struct results_store *newr;
88 int new_id = priv->next_free_id++;
90 /* TODO: we should have a limit on the number of
91 * outstanding paged searches
94 newr = talloc(priv, struct results_store);
95 if (!newr) return NULL;
97 newr->priv = priv;
99 newr->cookie = talloc_asprintf(newr, "%d", new_id);
100 if (!newr->cookie) {
101 talloc_free(newr);
102 return NULL;
105 newr->timestamp = time(NULL);
107 newr->first = NULL;
108 newr->num_entries = 0;
109 newr->first_ref = NULL;
110 newr->controls = NULL;
112 /* put this entry as first */
113 DLIST_ADD(priv->store, newr);
115 talloc_set_destructor(newr, store_destructor);
117 return newr;
120 struct paged_context {
121 struct ldb_module *module;
122 void *up_context;
123 int (*up_callback)(struct ldb_context *, void *, struct ldb_reply *);
125 int size;
127 struct results_store *store;
130 static struct ldb_handle *init_handle(void *mem_ctx, struct ldb_module *module,
131 void *context,
132 int (*callback)(struct ldb_context *, void *, struct ldb_reply *))
134 struct paged_context *ac;
135 struct ldb_handle *h;
137 h = talloc_zero(mem_ctx, struct ldb_handle);
138 if (h == NULL) {
139 ldb_set_errstring(module->ldb, "Out of Memory");
140 return NULL;
143 h->module = module;
145 ac = talloc_zero(h, struct paged_context);
146 if (ac == NULL) {
147 ldb_set_errstring(module->ldb, "Out of Memory");
148 talloc_free(h);
149 return NULL;
152 h->private_data = (void *)ac;
154 h->state = LDB_ASYNC_INIT;
155 h->status = LDB_SUCCESS;
157 ac->module = module;
158 ac->up_context = context;
159 ac->up_callback = callback;
161 return h;
164 static int paged_search_callback(struct ldb_context *ldb, void *context, struct ldb_reply *ares)
166 struct paged_context *ac = NULL;
168 if (!context || !ares) {
169 ldb_set_errstring(ldb, "NULL Context or Result in callback");
170 goto error;
173 ac = talloc_get_type(context, struct paged_context);
175 if (ares->type == LDB_REPLY_ENTRY) {
176 if (ac->store->first == NULL) {
177 ac->store->first = ac->store->last = talloc(ac->store, struct message_store);
178 } else {
179 ac->store->last->next = talloc(ac->store, struct message_store);
180 ac->store->last = ac->store->last->next;
182 if (ac->store->last == NULL) {
183 goto error;
186 ac->store->num_entries++;
188 ac->store->last->r = talloc_steal(ac->store->last, ares);
189 ac->store->last->next = NULL;
192 if (ares->type == LDB_REPLY_REFERRAL) {
193 if (ac->store->first_ref == NULL) {
194 ac->store->first_ref = ac->store->last_ref = talloc(ac->store, struct message_store);
195 } else {
196 ac->store->last_ref->next = talloc(ac->store, struct message_store);
197 ac->store->last_ref = ac->store->last_ref->next;
199 if (ac->store->last_ref == NULL) {
200 goto error;
203 ac->store->last_ref->r = talloc_steal(ac->store->last, ares);
204 ac->store->last_ref->next = NULL;
207 if (ares->type == LDB_REPLY_DONE) {
208 ac->store->controls = talloc_move(ac->store, &ares->controls);
209 talloc_free(ares);
212 return LDB_SUCCESS;
214 error:
215 talloc_free(ares);
216 return LDB_ERR_OPERATIONS_ERROR;
219 static int paged_search(struct ldb_module *module, struct ldb_request *req)
221 struct ldb_control *control;
222 struct private_data *private_data;
223 struct ldb_paged_control *paged_ctrl;
224 struct ldb_control **saved_controls;
225 struct paged_context *ac;
226 struct ldb_handle *h;
227 int ret;
229 /* check if there's a paged request control */
230 control = get_control_from_list(req->controls, LDB_CONTROL_PAGED_RESULTS_OID);
231 if (control == NULL) {
232 /* not found go on */
233 return ldb_next_request(module, req);
236 private_data = talloc_get_type(module->private_data, struct private_data);
238 req->handle = NULL;
240 if (!req->callback || !req->context) {
241 ldb_set_errstring(module->ldb,
242 "Async interface called with NULL callback function or NULL context");
243 return LDB_ERR_OPERATIONS_ERROR;
246 paged_ctrl = talloc_get_type(control->data, struct ldb_paged_control);
247 if (!paged_ctrl) {
248 return LDB_ERR_PROTOCOL_ERROR;
251 h = init_handle(req, module, req->context, req->callback);
252 if (!h) {
253 return LDB_ERR_OPERATIONS_ERROR;
255 ac = talloc_get_type(h->private_data, struct paged_context);
257 ac->size = paged_ctrl->size;
259 /* check if it is a continuation search the store */
260 if (paged_ctrl->cookie_len == 0) {
262 ac->store = new_store(private_data);
263 if (ac->store == NULL) {
264 talloc_free(h);
265 return LDB_ERR_UNWILLING_TO_PERFORM;
268 ac->store->req = talloc(ac->store, struct ldb_request);
269 if (!ac->store->req)
270 return LDB_ERR_OPERATIONS_ERROR;
272 ac->store->req->operation = req->operation;
273 ac->store->req->op.search.base = req->op.search.base;
274 ac->store->req->op.search.scope = req->op.search.scope;
275 ac->store->req->op.search.tree = req->op.search.tree;
276 ac->store->req->op.search.attrs = req->op.search.attrs;
277 ac->store->req->controls = req->controls;
279 /* save it locally and remove it from the list */
280 /* we do not need to replace them later as we
281 * are keeping the original req intact */
282 if (!save_controls(control, ac->store->req, &saved_controls)) {
283 return LDB_ERR_OPERATIONS_ERROR;
286 ac->store->req->context = ac;
287 ac->store->req->callback = paged_search_callback;
288 ldb_set_timeout_from_prev_req(module->ldb, req, ac->store->req);
290 ret = ldb_next_request(module, ac->store->req);
292 } else {
293 struct results_store *current = NULL;
295 for (current = private_data->store; current; current = current->next) {
296 if (strcmp(current->cookie, paged_ctrl->cookie) == 0) {
297 current->timestamp = time(NULL);
298 break;
301 if (current == NULL) {
302 talloc_free(h);
303 return LDB_ERR_UNWILLING_TO_PERFORM;
306 ac->store = current;
307 ret = LDB_SUCCESS;
310 req->handle = h;
312 /* check if it is an abandon */
313 if (ac->size == 0) {
314 talloc_free(ac->store);
315 h->status = LDB_SUCCESS;
316 h->state = LDB_ASYNC_DONE;
317 return LDB_SUCCESS;
320 /* TODO: age out old outstanding requests */
322 return ret;
326 static int paged_results(struct ldb_handle *handle)
328 struct paged_context *ac;
329 struct ldb_paged_control *paged;
330 struct ldb_reply *ares;
331 struct message_store *msg;
332 int i, num_ctrls, ret;
334 ac = talloc_get_type(handle->private_data, struct paged_context);
336 if (ac->store == NULL)
337 return LDB_ERR_OPERATIONS_ERROR;
339 while (ac->store->num_entries > 0 && ac->size > 0) {
340 msg = ac->store->first;
341 ret = ac->up_callback(ac->module->ldb, ac->up_context, msg->r);
342 if (ret != LDB_SUCCESS) {
343 handle->status = ret;
344 handle->state = LDB_ASYNC_DONE;
345 return ret;
348 ac->store->first = msg->next;
349 talloc_free(msg);
350 ac->store->num_entries--;
351 ac->size--;
354 handle->state = LDB_ASYNC_DONE;
356 while (ac->store->first_ref != NULL) {
357 msg = ac->store->first_ref;
358 ret = ac->up_callback(ac->module->ldb, ac->up_context, msg->r);
359 if (ret != LDB_SUCCESS) {
360 handle->status = ret;
361 handle->state = LDB_ASYNC_DONE;
362 return ret;
365 ac->store->first_ref = msg->next;
366 talloc_free(msg);
369 ares = talloc_zero(ac->store, struct ldb_reply);
370 if (ares == NULL) {
371 handle->status = LDB_ERR_OPERATIONS_ERROR;
372 return handle->status;
374 num_ctrls = 2;
375 i = 0;
377 if (ac->store->controls != NULL) {
378 ares->controls = ac->store->controls;
379 while (ares->controls[i]) i++; /* counting */
381 ares->controls = talloc_move(ares, &ac->store->controls);
382 num_ctrls += i;
385 ares->controls = talloc_realloc(ares, ares->controls, struct ldb_control *, num_ctrls);
386 if (ares->controls == NULL) {
387 handle->status = LDB_ERR_OPERATIONS_ERROR;
388 return handle->status;
391 ares->controls[i] = talloc(ares->controls, struct ldb_control);
392 if (ares->controls[i] == NULL) {
393 handle->status = LDB_ERR_OPERATIONS_ERROR;
394 return handle->status;
397 ares->controls[i]->oid = talloc_strdup(ares->controls[i], LDB_CONTROL_PAGED_RESULTS_OID);
398 if (ares->controls[i]->oid == NULL) {
399 handle->status = LDB_ERR_OPERATIONS_ERROR;
400 return handle->status;
403 ares->controls[i]->critical = 0;
404 ares->controls[i + 1] = NULL;
406 paged = talloc(ares->controls[i], struct ldb_paged_control);
407 if (paged == NULL) {
408 handle->status = LDB_ERR_OPERATIONS_ERROR;
409 return handle->status;
412 ares->controls[i]->data = paged;
414 if (ac->size > 0) {
415 paged->size = 0;
416 paged->cookie = NULL;
417 paged->cookie_len = 0;
418 } else {
419 paged->size = ac->store->num_entries;
420 paged->cookie = talloc_strdup(paged, ac->store->cookie);
421 paged->cookie_len = strlen(paged->cookie) + 1;
424 ares->type = LDB_REPLY_DONE;
426 ret = ac->up_callback(ac->module->ldb, ac->up_context, ares);
428 handle->status = ret;
430 return ret;
433 static int paged_wait(struct ldb_handle *handle, enum ldb_wait_type type)
435 struct paged_context *ac;
436 int ret;
438 if (!handle || !handle->private_data) {
439 return LDB_ERR_OPERATIONS_ERROR;
442 if (handle->state == LDB_ASYNC_DONE) {
443 return handle->status;
446 handle->state = LDB_ASYNC_PENDING;
448 ac = talloc_get_type(handle->private_data, struct paged_context);
450 if (ac->store->req->handle->state == LDB_ASYNC_DONE) {
451 /* if lower level is finished we do not need to call it anymore */
452 /* return all we have until size == 0 or we empty storage */
453 ret = paged_results(handle);
455 /* we are done, if num_entries is zero free the storage
456 * as that mean we delivered the last batch */
457 if (ac->store->num_entries == 0) {
458 talloc_free(ac->store);
461 return ret;
464 if (type == LDB_WAIT_ALL) {
465 while (ac->store->req->handle->state != LDB_ASYNC_DONE) {
466 ret = ldb_wait(ac->store->req->handle, type);
467 if (ret != LDB_SUCCESS) {
468 handle->state = LDB_ASYNC_DONE;
469 handle->status = ret;
470 return ret;
474 ret = paged_results(handle);
476 /* we are done, if num_entries is zero free the storage
477 * as that mean we delivered the last batch */
478 if (ac->store->num_entries == 0) {
479 talloc_free(ac->store);
482 return ret;
485 ret = ldb_wait(ac->store->req->handle, type);
486 if (ret != LDB_SUCCESS) {
487 handle->state = LDB_ASYNC_DONE;
488 handle->status = ret;
489 return ret;
492 handle->status = ret;
494 if (ac->store->num_entries >= ac->size ||
495 ac->store->req->handle->state == LDB_ASYNC_DONE) {
497 ret = paged_results(handle);
499 /* we are done, if num_entries is zero free the storage
500 * as that mean we delivered the last batch */
501 if (ac->store->num_entries == 0) {
502 talloc_free(ac->store);
506 return ret;
509 static int paged_request_init(struct ldb_module *module)
511 struct private_data *data;
512 struct ldb_request *req;
513 int ret;
515 data = talloc(module, struct private_data);
516 if (data == NULL) {
517 return LDB_ERR_OTHER;
520 data->next_free_id = 1;
521 data->store = NULL;
522 module->private_data = data;
524 req = talloc(module, struct ldb_request);
525 if (req == NULL) {
526 return LDB_ERR_OPERATIONS_ERROR;
529 req->operation = LDB_REQ_REGISTER_CONTROL;
530 req->op.reg_control.oid = LDB_CONTROL_PAGED_RESULTS_OID;
531 req->controls = NULL;
533 ret = ldb_request(module->ldb, req);
534 if (ret != LDB_SUCCESS) {
535 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "paged_request: Unable to register control with rootdse!\n");
538 talloc_free(req);
539 return ldb_next_init(module);
542 static const struct ldb_module_ops paged_ops = {
543 .name = "paged_results",
544 .search = paged_search,
545 .wait = paged_wait,
546 .init_context = paged_request_init
549 int ldb_paged_results_init(void)
551 return ldb_register_module(&paged_ops);