s3-krb5: Fix Coverity #722 (RESOURCE_LEAK).
[Samba/gbeck.git] / source3 / lib / dbwrap_ctdb.c
blob4a5bf6d81a793d8d3b46f3e31cc594b70f60553d
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
34 uint32_t nesting;
35 bool nested_cancel;
38 struct db_ctdb_ctx {
39 struct db_context *db;
40 struct tdb_wrap *wtdb;
41 uint32 db_id;
42 struct db_ctdb_transaction_handle *transaction;
45 struct db_ctdb_rec {
46 struct db_ctdb_ctx *ctdb_ctx;
47 struct ctdb_ltdb_header header;
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51 TALLOC_CTX *mem_ctx,
52 TDB_DATA key,
53 bool persistent);
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
57 NTSTATUS status;
58 enum TDB_ERROR tret = tdb_error(tdb);
60 switch (tret) {
61 case TDB_ERR_EXISTS:
62 status = NT_STATUS_OBJECT_NAME_COLLISION;
63 break;
64 case TDB_ERR_NOEXIST:
65 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66 break;
67 default:
68 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69 break;
72 return status;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
81 of the record
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
84 TDB_DATA key,
85 struct ctdb_ltdb_header *header,
86 TDB_DATA data)
88 size_t length;
89 struct ctdb_rec_data *d;
91 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
92 data.dsize + (header?sizeof(*header):0);
93 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94 if (d == NULL) {
95 return NULL;
97 d->length = length;
98 d->reqid = reqid;
99 d->keylen = key.dsize;
100 memcpy(&d->data[0], key.dptr, key.dsize);
101 if (header) {
102 d->datalen = data.dsize + sizeof(*header);
103 memcpy(&d->data[key.dsize], header, sizeof(*header));
104 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105 } else {
106 d->datalen = data.dsize;
107 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
109 return d;
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
115 struct ctdb_marshall_buffer *m,
116 uint64_t db_id,
117 uint32_t reqid,
118 TDB_DATA key,
119 struct ctdb_ltdb_header *header,
120 TDB_DATA data)
122 struct ctdb_rec_data *r;
123 size_t m_size, r_size;
124 struct ctdb_marshall_buffer *m2 = NULL;
126 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
127 if (r == NULL) {
128 talloc_free(m);
129 return NULL;
132 if (m == NULL) {
133 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
134 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
135 if (m == NULL) {
136 goto done;
138 m->db_id = db_id;
141 m_size = talloc_get_size(m);
142 r_size = talloc_get_size(r);
144 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
145 mem_ctx, m, m_size + r_size);
146 if (m2 == NULL) {
147 talloc_free(m);
148 goto done;
151 memcpy(m_size + (uint8_t *)m2, r, r_size);
153 m2->count++;
155 done:
156 talloc_free(r);
157 return m2;
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
163 TDB_DATA data;
164 data.dptr = (uint8_t *)m;
165 data.dsize = talloc_get_size(m);
166 return data;
170 loop over a marshalling buffer
172 - pass r==NULL to start
173 - loop the number of times indicated by m->count
175 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
176 uint32_t *reqid,
177 struct ctdb_ltdb_header *header,
178 TDB_DATA *key, TDB_DATA *data)
180 if (r == NULL) {
181 r = (struct ctdb_rec_data *)&m->data[0];
182 } else {
183 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
186 if (reqid != NULL) {
187 *reqid = r->reqid;
190 if (key != NULL) {
191 key->dptr = &r->data[0];
192 key->dsize = r->keylen;
194 if (data != NULL) {
195 data->dptr = &r->data[r->keylen];
196 data->dsize = r->datalen;
197 if (header != NULL) {
198 data->dptr += sizeof(*header);
199 data->dsize -= sizeof(*header);
203 if (header != NULL) {
204 if (r->datalen < sizeof(*header)) {
205 return NULL;
207 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
210 return r;
215 /* start a transaction on a database */
216 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
218 tdb_transaction_cancel(h->ctx->wtdb->tdb);
219 return 0;
222 /* start a transaction on a database */
223 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
225 struct db_record *rh;
226 TDB_DATA key;
227 TALLOC_CTX *tmp_ctx;
228 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
229 int ret;
230 struct db_ctdb_ctx *ctx = h->ctx;
231 TDB_DATA data;
233 key.dptr = (uint8_t *)discard_const(keyname);
234 key.dsize = strlen(keyname);
236 again:
237 tmp_ctx = talloc_new(h);
239 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
240 if (rh == NULL) {
241 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
242 talloc_free(tmp_ctx);
243 return -1;
245 talloc_free(rh);
247 ret = tdb_transaction_start(ctx->wtdb->tdb);
248 if (ret != 0) {
249 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
250 talloc_free(tmp_ctx);
251 return -1;
254 data = tdb_fetch(ctx->wtdb->tdb, key);
255 if ((data.dptr == NULL) ||
256 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
257 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
258 SAFE_FREE(data.dptr);
259 tdb_transaction_cancel(ctx->wtdb->tdb);
260 talloc_free(tmp_ctx);
261 goto again;
264 SAFE_FREE(data.dptr);
265 talloc_free(tmp_ctx);
267 return 0;
271 /* start a transaction on a database */
272 static int db_ctdb_transaction_start(struct db_context *db)
274 struct db_ctdb_transaction_handle *h;
275 int ret;
276 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
277 struct db_ctdb_ctx);
279 if (!db->persistent) {
280 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
281 ctx->db_id));
282 return -1;
285 if (ctx->transaction) {
286 ctx->transaction->nesting++;
287 return 0;
290 h = talloc_zero(db, struct db_ctdb_transaction_handle);
291 if (h == NULL) {
292 DEBUG(0,(__location__ " oom for transaction handle\n"));
293 return -1;
296 h->ctx = ctx;
298 ret = db_ctdb_transaction_fetch_start(h);
299 if (ret != 0) {
300 talloc_free(h);
301 return -1;
304 talloc_set_destructor(h, db_ctdb_transaction_destructor);
306 ctx->transaction = h;
308 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
310 return 0;
316 fetch a record inside a transaction
318 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
319 TALLOC_CTX *mem_ctx,
320 TDB_DATA key, TDB_DATA *data)
322 struct db_ctdb_transaction_handle *h = db->transaction;
324 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
326 if (data->dptr != NULL) {
327 uint8_t *oldptr = (uint8_t *)data->dptr;
328 data->dsize -= sizeof(struct ctdb_ltdb_header);
329 if (data->dsize == 0) {
330 data->dptr = NULL;
331 } else {
332 data->dptr = (uint8 *)
333 talloc_memdup(
334 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
335 data->dsize);
337 SAFE_FREE(oldptr);
338 if (data->dptr == NULL && data->dsize != 0) {
339 return -1;
343 if (!h->in_replay) {
344 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
345 if (h->m_all == NULL) {
346 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
347 data->dsize = 0;
348 talloc_free(data->dptr);
349 return -1;
353 return 0;
357 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
358 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
360 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
361 TALLOC_CTX *mem_ctx,
362 TDB_DATA key)
364 struct db_record *result;
365 TDB_DATA ctdb_data;
367 if (!(result = talloc(mem_ctx, struct db_record))) {
368 DEBUG(0, ("talloc failed\n"));
369 return NULL;
372 result->private_data = ctx->transaction;
374 result->key.dsize = key.dsize;
375 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
376 if (result->key.dptr == NULL) {
377 DEBUG(0, ("talloc failed\n"));
378 TALLOC_FREE(result);
379 return NULL;
382 result->store = db_ctdb_store_transaction;
383 result->delete_rec = db_ctdb_delete_transaction;
385 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
386 if (ctdb_data.dptr == NULL) {
387 /* create the record */
388 result->value = tdb_null;
389 return result;
392 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
393 result->value.dptr = NULL;
395 if ((result->value.dsize != 0)
396 && !(result->value.dptr = (uint8 *)talloc_memdup(
397 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
398 result->value.dsize))) {
399 DEBUG(0, ("talloc failed\n"));
400 TALLOC_FREE(result);
403 SAFE_FREE(ctdb_data.dptr);
405 return result;
408 static int db_ctdb_record_destructor(struct db_record **recp)
410 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
411 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
412 rec->private_data, struct db_ctdb_transaction_handle);
413 int ret = h->ctx->db->transaction_commit(h->ctx->db);
414 if (ret != 0) {
415 DEBUG(0,(__location__ " transaction_commit failed\n"));
417 return 0;
421 auto-create a transaction for persistent databases
423 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
424 TALLOC_CTX *mem_ctx,
425 TDB_DATA key)
427 int res;
428 struct db_record *rec, **recp;
430 res = db_ctdb_transaction_start(ctx->db);
431 if (res == -1) {
432 return NULL;
435 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
436 if (rec == NULL) {
437 ctx->db->transaction_cancel(ctx->db);
438 return NULL;
441 /* destroy this transaction when we release the lock */
442 recp = talloc(rec, struct db_record *);
443 if (recp == NULL) {
444 ctx->db->transaction_cancel(ctx->db);
445 talloc_free(rec);
446 return NULL;
448 *recp = rec;
449 talloc_set_destructor(recp, db_ctdb_record_destructor);
450 return rec;
455 stores a record inside a transaction
457 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
458 TDB_DATA key, TDB_DATA data)
460 TALLOC_CTX *tmp_ctx = talloc_new(h);
461 int ret;
462 TDB_DATA rec;
463 struct ctdb_ltdb_header header;
465 /* we need the header so we can update the RSN */
466 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
467 if (rec.dptr == NULL) {
468 /* the record doesn't exist - create one with us as dmaster.
469 This is only safe because we are in a transaction and this
470 is a persistent database */
471 ZERO_STRUCT(header);
472 header.dmaster = get_my_vnn();
473 } else {
474 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
475 rec.dsize -= sizeof(struct ctdb_ltdb_header);
476 /* a special case, we are writing the same data that is there now */
477 if (data.dsize == rec.dsize &&
478 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
479 SAFE_FREE(rec.dptr);
480 talloc_free(tmp_ctx);
481 return 0;
483 SAFE_FREE(rec.dptr);
486 header.rsn++;
488 if (!h->in_replay) {
489 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
490 if (h->m_all == NULL) {
491 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
492 talloc_free(tmp_ctx);
493 return -1;
497 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
498 if (h->m_write == NULL) {
499 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
500 talloc_free(tmp_ctx);
501 return -1;
504 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
505 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
506 if (rec.dptr == NULL) {
507 DEBUG(0,(__location__ " Failed to alloc record\n"));
508 talloc_free(tmp_ctx);
509 return -1;
511 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
512 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
514 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
516 talloc_free(tmp_ctx);
518 return ret;
523 a record store inside a transaction
525 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
527 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
528 rec->private_data, struct db_ctdb_transaction_handle);
529 int ret;
531 ret = db_ctdb_transaction_store(h, rec->key, data);
532 if (ret != 0) {
533 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
535 return NT_STATUS_OK;
539 a record delete inside a transaction
541 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
543 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
544 rec->private_data, struct db_ctdb_transaction_handle);
545 int ret;
547 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
548 if (ret != 0) {
549 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
551 return NT_STATUS_OK;
556 replay a transaction
558 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
560 int ret, i;
561 struct ctdb_rec_data *rec = NULL;
563 h->in_replay = true;
564 talloc_free(h->m_write);
565 h->m_write = NULL;
567 ret = db_ctdb_transaction_fetch_start(h);
568 if (ret != 0) {
569 return ret;
572 for (i=0;i<h->m_all->count;i++) {
573 TDB_DATA key, data;
575 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
576 if (rec == NULL) {
577 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
578 goto failed;
581 if (rec->reqid == 0) {
582 /* its a store */
583 if (db_ctdb_transaction_store(h, key, data) != 0) {
584 goto failed;
586 } else {
587 TDB_DATA data2;
588 TALLOC_CTX *tmp_ctx = talloc_new(h);
590 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
591 talloc_free(tmp_ctx);
592 goto failed;
594 if (data2.dsize != data.dsize ||
595 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
596 /* the record has changed on us - we have to give up */
597 talloc_free(tmp_ctx);
598 goto failed;
600 talloc_free(tmp_ctx);
604 return 0;
606 failed:
607 tdb_transaction_cancel(h->ctx->wtdb->tdb);
608 return -1;
613 commit a transaction
615 static int db_ctdb_transaction_commit(struct db_context *db)
617 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
618 struct db_ctdb_ctx);
619 NTSTATUS rets;
620 int ret;
621 int status;
622 int retries = 0;
623 struct db_ctdb_transaction_handle *h = ctx->transaction;
624 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
626 if (h == NULL) {
627 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
628 return -1;
631 if (h->nested_cancel) {
632 db->transaction_cancel(db);
633 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
634 return -1;
637 if (h->nesting != 0) {
638 h->nesting--;
639 return 0;
642 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
644 talloc_set_destructor(h, NULL);
646 /* our commit strategy is quite complex.
648 - we first try to commit the changes to all other nodes
650 - if that works, then we commit locally and we are done
652 - if a commit on another node fails, then we need to cancel
653 the transaction, then restart the transaction (thus
654 opening a window of time for a pending recovery to
655 complete), then replay the transaction, checking all the
656 reads and writes (checking that reads give the same data,
657 and writes succeed). Then we retry the transaction to the
658 other nodes
661 again:
662 if (h->m_write == NULL) {
663 /* no changes were made, potentially after a retry */
664 tdb_transaction_cancel(h->ctx->wtdb->tdb);
665 talloc_free(h);
666 ctx->transaction = NULL;
667 return 0;
670 /* tell ctdbd to commit to the other nodes */
671 rets = ctdbd_control_local(messaging_ctdbd_connection(),
672 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
673 h->ctx->db_id, 0,
674 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
675 if (!NT_STATUS_IS_OK(rets) || status != 0) {
676 tdb_transaction_cancel(h->ctx->wtdb->tdb);
677 sleep(1);
679 if (!NT_STATUS_IS_OK(rets)) {
680 failure_control = CTDB_CONTROL_TRANS2_ERROR;
681 } else {
682 /* work out what error code we will give if we
683 have to fail the operation */
684 switch ((enum ctdb_trans2_commit_error)status) {
685 case CTDB_TRANS2_COMMIT_SUCCESS:
686 case CTDB_TRANS2_COMMIT_SOMEFAIL:
687 case CTDB_TRANS2_COMMIT_TIMEOUT:
688 failure_control = CTDB_CONTROL_TRANS2_ERROR;
689 break;
690 case CTDB_TRANS2_COMMIT_ALLFAIL:
691 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
692 break;
696 if (++retries == 5) {
697 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
698 h->ctx->db_id, retries, (unsigned)failure_control));
699 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
700 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
701 tdb_null, NULL, NULL, NULL);
702 h->ctx->transaction = NULL;
703 talloc_free(h);
704 ctx->transaction = NULL;
705 return -1;
708 if (ctdb_replay_transaction(h) != 0) {
709 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
710 (unsigned)failure_control));
711 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
712 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
713 tdb_null, NULL, NULL, NULL);
714 h->ctx->transaction = NULL;
715 talloc_free(h);
716 ctx->transaction = NULL;
717 return -1;
719 goto again;
720 } else {
721 failure_control = CTDB_CONTROL_TRANS2_ERROR;
724 /* do the real commit locally */
725 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
726 if (ret != 0) {
727 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
728 (unsigned)failure_control));
729 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
730 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
731 h->ctx->transaction = NULL;
732 talloc_free(h);
733 return ret;
736 /* tell ctdbd that we are finished with our local commit */
737 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
738 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
739 tdb_null, NULL, NULL, NULL);
740 h->ctx->transaction = NULL;
741 talloc_free(h);
742 return 0;
747 cancel a transaction
749 static int db_ctdb_transaction_cancel(struct db_context *db)
751 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
752 struct db_ctdb_ctx);
753 struct db_ctdb_transaction_handle *h = ctx->transaction;
755 if (h == NULL) {
756 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
757 return -1;
760 if (h->nesting != 0) {
761 h->nesting--;
762 h->nested_cancel = true;
763 return 0;
766 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
768 ctx->transaction = NULL;
769 talloc_free(h);
770 return 0;
774 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
776 struct db_ctdb_rec *crec = talloc_get_type_abort(
777 rec->private_data, struct db_ctdb_rec);
778 TDB_DATA cdata;
779 int ret;
781 cdata.dsize = sizeof(crec->header) + data.dsize;
783 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
784 return NT_STATUS_NO_MEMORY;
787 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
788 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
790 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
792 SAFE_FREE(cdata.dptr);
794 return (ret == 0) ? NT_STATUS_OK
795 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
800 static NTSTATUS db_ctdb_delete(struct db_record *rec)
802 TDB_DATA data;
805 * We have to store the header with empty data. TODO: Fix the
806 * tdb-level cleanup
809 ZERO_STRUCT(data);
811 return db_ctdb_store(rec, data, 0);
815 static int db_ctdb_record_destr(struct db_record* data)
817 struct db_ctdb_rec *crec = talloc_get_type_abort(
818 data->private_data, struct db_ctdb_rec);
820 DEBUG(10, (DEBUGLEVEL > 10
821 ? "Unlocking db %u key %s\n"
822 : "Unlocking db %u key %.20s\n",
823 (int)crec->ctdb_ctx->db_id,
824 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
825 data->key.dsize)));
827 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
828 DEBUG(0, ("tdb_chainunlock failed\n"));
829 return -1;
832 return 0;
835 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
836 TALLOC_CTX *mem_ctx,
837 TDB_DATA key,
838 bool persistent)
840 struct db_record *result;
841 struct db_ctdb_rec *crec;
842 NTSTATUS status;
843 TDB_DATA ctdb_data;
844 int migrate_attempts = 0;
846 if (!(result = talloc(mem_ctx, struct db_record))) {
847 DEBUG(0, ("talloc failed\n"));
848 return NULL;
851 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
852 DEBUG(0, ("talloc failed\n"));
853 TALLOC_FREE(result);
854 return NULL;
857 result->private_data = (void *)crec;
858 crec->ctdb_ctx = ctx;
860 result->key.dsize = key.dsize;
861 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
862 if (result->key.dptr == NULL) {
863 DEBUG(0, ("talloc failed\n"));
864 TALLOC_FREE(result);
865 return NULL;
869 * Do a blocking lock on the record
871 again:
873 if (DEBUGLEVEL >= 10) {
874 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
875 DEBUG(10, (DEBUGLEVEL > 10
876 ? "Locking db %u key %s\n"
877 : "Locking db %u key %.20s\n",
878 (int)crec->ctdb_ctx->db_id, keystr));
879 TALLOC_FREE(keystr);
882 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
883 DEBUG(3, ("tdb_chainlock failed\n"));
884 TALLOC_FREE(result);
885 return NULL;
888 result->store = db_ctdb_store;
889 result->delete_rec = db_ctdb_delete;
890 talloc_set_destructor(result, db_ctdb_record_destr);
892 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
895 * See if we have a valid record and we are the dmaster. If so, we can
896 * take the shortcut and just return it.
899 if ((ctdb_data.dptr == NULL) ||
900 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
901 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
902 #if 0
903 || (random() % 2 != 0)
904 #endif
906 SAFE_FREE(ctdb_data.dptr);
907 tdb_chainunlock(ctx->wtdb->tdb, key);
908 talloc_set_destructor(result, NULL);
910 migrate_attempts += 1;
912 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
913 ctdb_data.dptr, ctdb_data.dptr ?
914 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
915 get_my_vnn()));
917 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
918 if (!NT_STATUS_IS_OK(status)) {
919 DEBUG(5, ("ctdb_migrate failed: %s\n",
920 nt_errstr(status)));
921 TALLOC_FREE(result);
922 return NULL;
924 /* now its migrated, try again */
925 goto again;
928 if (migrate_attempts > 10) {
929 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
930 migrate_attempts));
933 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
935 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
936 result->value.dptr = NULL;
938 if ((result->value.dsize != 0)
939 && !(result->value.dptr = (uint8 *)talloc_memdup(
940 result, ctdb_data.dptr + sizeof(crec->header),
941 result->value.dsize))) {
942 DEBUG(0, ("talloc failed\n"));
943 TALLOC_FREE(result);
946 SAFE_FREE(ctdb_data.dptr);
948 return result;
951 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
952 TALLOC_CTX *mem_ctx,
953 TDB_DATA key)
955 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
956 struct db_ctdb_ctx);
958 if (ctx->transaction != NULL) {
959 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
962 if (db->persistent) {
963 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
966 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
970 fetch (unlocked, no migration) operation on ctdb
972 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
973 TDB_DATA key, TDB_DATA *data)
975 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
976 struct db_ctdb_ctx);
977 NTSTATUS status;
978 TDB_DATA ctdb_data;
980 if (ctx->transaction) {
981 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
984 /* try a direct fetch */
985 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
988 * See if we have a valid record and we are the dmaster. If so, we can
989 * take the shortcut and just return it.
990 * we bypass the dmaster check for persistent databases
992 if ((ctdb_data.dptr != NULL) &&
993 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
994 (db->persistent ||
995 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
996 /* we are the dmaster - avoid the ctdb protocol op */
998 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
999 if (data->dsize == 0) {
1000 SAFE_FREE(ctdb_data.dptr);
1001 data->dptr = NULL;
1002 return 0;
1005 data->dptr = (uint8 *)talloc_memdup(
1006 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1007 data->dsize);
1009 SAFE_FREE(ctdb_data.dptr);
1011 if (data->dptr == NULL) {
1012 return -1;
1014 return 0;
1017 SAFE_FREE(ctdb_data.dptr);
1019 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1020 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1021 if (!NT_STATUS_IS_OK(status)) {
1022 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1023 return -1;
1026 return 0;
1029 struct traverse_state {
1030 struct db_context *db;
1031 int (*fn)(struct db_record *rec, void *private_data);
1032 void *private_data;
1035 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1037 struct traverse_state *state = (struct traverse_state *)private_data;
1038 struct db_record *rec;
1039 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1040 /* we have to give them a locked record to prevent races */
1041 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1042 if (rec && rec->value.dsize > 0) {
1043 state->fn(rec, state->private_data);
1045 talloc_free(tmp_ctx);
1048 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1049 void *private_data)
1051 struct traverse_state *state = (struct traverse_state *)private_data;
1052 struct db_record *rec;
1053 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1054 int ret = 0;
1055 /* we have to give them a locked record to prevent races */
1056 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1057 if (rec && rec->value.dsize > 0) {
1058 ret = state->fn(rec, state->private_data);
1060 talloc_free(tmp_ctx);
1061 return ret;
1064 static int db_ctdb_traverse(struct db_context *db,
1065 int (*fn)(struct db_record *rec,
1066 void *private_data),
1067 void *private_data)
1069 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1070 struct db_ctdb_ctx);
1071 struct traverse_state state;
1073 state.db = db;
1074 state.fn = fn;
1075 state.private_data = private_data;
1077 if (db->persistent) {
1078 /* for persistent databases we don't need to do a ctdb traverse,
1079 we can do a faster local traverse */
1080 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1084 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1085 return 0;
1088 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1090 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1093 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1095 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1098 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1100 struct traverse_state *state = (struct traverse_state *)private_data;
1101 struct db_record rec;
1102 rec.key = key;
1103 rec.value = data;
1104 rec.store = db_ctdb_store_deny;
1105 rec.delete_rec = db_ctdb_delete_deny;
1106 rec.private_data = state->db;
1107 state->fn(&rec, state->private_data);
1110 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1111 void *private_data)
1113 struct traverse_state *state = (struct traverse_state *)private_data;
1114 struct db_record rec;
1115 rec.key = kbuf;
1116 rec.value = dbuf;
1117 rec.store = db_ctdb_store_deny;
1118 rec.delete_rec = db_ctdb_delete_deny;
1119 rec.private_data = state->db;
1121 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1122 /* a deleted record */
1123 return 0;
1125 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1126 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1128 return state->fn(&rec, state->private_data);
1131 static int db_ctdb_traverse_read(struct db_context *db,
1132 int (*fn)(struct db_record *rec,
1133 void *private_data),
1134 void *private_data)
1136 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1137 struct db_ctdb_ctx);
1138 struct traverse_state state;
1140 state.db = db;
1141 state.fn = fn;
1142 state.private_data = private_data;
1144 if (db->persistent) {
1145 /* for persistent databases we don't need to do a ctdb traverse,
1146 we can do a faster local traverse */
1147 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1150 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1151 return 0;
1154 static int db_ctdb_get_seqnum(struct db_context *db)
1156 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1157 struct db_ctdb_ctx);
1158 return tdb_get_seqnum(ctx->wtdb->tdb);
1161 static int db_ctdb_get_flags(struct db_context *db)
1163 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1164 struct db_ctdb_ctx);
1165 return tdb_get_flags(ctx->wtdb->tdb);
1168 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1169 const char *name,
1170 int hash_size, int tdb_flags,
1171 int open_flags, mode_t mode)
1173 struct db_context *result;
1174 struct db_ctdb_ctx *db_ctdb;
1175 char *db_path;
1177 if (!lp_clustering()) {
1178 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1179 return NULL;
1182 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1183 DEBUG(0, ("talloc failed\n"));
1184 TALLOC_FREE(result);
1185 return NULL;
1188 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1189 DEBUG(0, ("talloc failed\n"));
1190 TALLOC_FREE(result);
1191 return NULL;
1194 db_ctdb->transaction = NULL;
1195 db_ctdb->db = result;
1197 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1198 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1199 TALLOC_FREE(result);
1200 return NULL;
1203 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1205 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1207 /* only pass through specific flags */
1208 tdb_flags &= TDB_SEQNUM;
1210 /* honor permissions if user has specified O_CREAT */
1211 if (open_flags & O_CREAT) {
1212 chmod(db_path, mode);
1215 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1216 if (db_ctdb->wtdb == NULL) {
1217 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1218 TALLOC_FREE(result);
1219 return NULL;
1221 talloc_free(db_path);
1223 result->private_data = (void *)db_ctdb;
1224 result->fetch_locked = db_ctdb_fetch_locked;
1225 result->fetch = db_ctdb_fetch;
1226 result->traverse = db_ctdb_traverse;
1227 result->traverse_read = db_ctdb_traverse_read;
1228 result->get_seqnum = db_ctdb_get_seqnum;
1229 result->get_flags = db_ctdb_get_flags;
1230 result->transaction_start = db_ctdb_transaction_start;
1231 result->transaction_commit = db_ctdb_transaction_commit;
1232 result->transaction_cancel = db_ctdb_transaction_cancel;
1234 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1235 name, db_ctdb->db_id));
1237 return result;
1239 #endif