create-tarball: Adapt script to changed directory structure.
[Samba/gbeck.git] / source3 / lib / dbwrap_ctdb.c
blob63a5ce4de6782f858342447757cec563605d5a85
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
34 uint32_t nesting;
35 bool nested_cancel;
38 struct db_ctdb_ctx {
39 struct db_context *db;
40 struct tdb_wrap *wtdb;
41 uint32 db_id;
42 struct db_ctdb_transaction_handle *transaction;
45 struct db_ctdb_rec {
46 struct db_ctdb_ctx *ctdb_ctx;
47 struct ctdb_ltdb_header header;
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51 TALLOC_CTX *mem_ctx,
52 TDB_DATA key,
53 bool persistent);
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
57 NTSTATUS status;
58 enum TDB_ERROR tret = tdb_error(tdb);
60 switch (tret) {
61 case TDB_ERR_EXISTS:
62 status = NT_STATUS_OBJECT_NAME_COLLISION;
63 break;
64 case TDB_ERR_NOEXIST:
65 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66 break;
67 default:
68 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69 break;
72 return status;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
81 of the record
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
84 TDB_DATA key,
85 struct ctdb_ltdb_header *header,
86 TDB_DATA data)
88 size_t length;
89 struct ctdb_rec_data *d;
91 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
92 data.dsize + (header?sizeof(*header):0);
93 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94 if (d == NULL) {
95 return NULL;
97 d->length = length;
98 d->reqid = reqid;
99 d->keylen = key.dsize;
100 memcpy(&d->data[0], key.dptr, key.dsize);
101 if (header) {
102 d->datalen = data.dsize + sizeof(*header);
103 memcpy(&d->data[key.dsize], header, sizeof(*header));
104 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105 } else {
106 d->datalen = data.dsize;
107 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
109 return d;
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
115 struct ctdb_marshall_buffer *m,
116 uint64_t db_id,
117 uint32_t reqid,
118 TDB_DATA key,
119 struct ctdb_ltdb_header *header,
120 TDB_DATA data)
122 struct ctdb_rec_data *r;
123 size_t m_size, r_size;
124 struct ctdb_marshall_buffer *m2;
126 r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
127 if (r == NULL) {
128 talloc_free(m);
129 return NULL;
132 if (m == NULL) {
133 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
134 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
135 if (m == NULL) {
136 return NULL;
138 m->db_id = db_id;
141 m_size = talloc_get_size(m);
142 r_size = talloc_get_size(r);
144 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
145 mem_ctx, m, m_size + r_size);
146 if (m2 == NULL) {
147 talloc_free(m);
148 return NULL;
151 memcpy(m_size + (uint8_t *)m2, r, r_size);
153 talloc_free(r);
155 m2->count++;
157 return m2;
160 /* we've finished marshalling, return a data blob with the marshalled records */
161 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
163 TDB_DATA data;
164 data.dptr = (uint8_t *)m;
165 data.dsize = talloc_get_size(m);
166 return data;
170 loop over a marshalling buffer
172 - pass r==NULL to start
173 - loop the number of times indicated by m->count
175 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
176 uint32_t *reqid,
177 struct ctdb_ltdb_header *header,
178 TDB_DATA *key, TDB_DATA *data)
180 if (r == NULL) {
181 r = (struct ctdb_rec_data *)&m->data[0];
182 } else {
183 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
186 if (reqid != NULL) {
187 *reqid = r->reqid;
190 if (key != NULL) {
191 key->dptr = &r->data[0];
192 key->dsize = r->keylen;
194 if (data != NULL) {
195 data->dptr = &r->data[r->keylen];
196 data->dsize = r->datalen;
197 if (header != NULL) {
198 data->dptr += sizeof(*header);
199 data->dsize -= sizeof(*header);
203 if (header != NULL) {
204 if (r->datalen < sizeof(*header)) {
205 return NULL;
207 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
210 return r;
215 /* start a transaction on a database */
216 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
218 tdb_transaction_cancel(h->ctx->wtdb->tdb);
219 return 0;
222 /* start a transaction on a database */
223 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
225 struct db_record *rh;
226 TDB_DATA key;
227 TALLOC_CTX *tmp_ctx;
228 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
229 int ret;
230 struct db_ctdb_ctx *ctx = h->ctx;
231 TDB_DATA data;
233 key.dptr = (uint8_t *)discard_const(keyname);
234 key.dsize = strlen(keyname);
236 again:
237 tmp_ctx = talloc_new(h);
239 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
240 if (rh == NULL) {
241 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
242 talloc_free(tmp_ctx);
243 return -1;
245 talloc_free(rh);
247 ret = tdb_transaction_start(ctx->wtdb->tdb);
248 if (ret != 0) {
249 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
250 talloc_free(tmp_ctx);
251 return -1;
254 data = tdb_fetch(ctx->wtdb->tdb, key);
255 if ((data.dptr == NULL) ||
256 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
257 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
258 SAFE_FREE(data.dptr);
259 tdb_transaction_cancel(ctx->wtdb->tdb);
260 talloc_free(tmp_ctx);
261 goto again;
264 SAFE_FREE(data.dptr);
265 talloc_free(tmp_ctx);
267 return 0;
271 /* start a transaction on a database */
272 static int db_ctdb_transaction_start(struct db_context *db)
274 struct db_ctdb_transaction_handle *h;
275 int ret;
276 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
277 struct db_ctdb_ctx);
279 if (!db->persistent) {
280 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
281 ctx->db_id));
282 return -1;
285 if (ctx->transaction) {
286 ctx->transaction->nesting++;
287 return 0;
290 h = talloc_zero(db, struct db_ctdb_transaction_handle);
291 if (h == NULL) {
292 DEBUG(0,(__location__ " oom for transaction handle\n"));
293 return -1;
296 h->ctx = ctx;
298 ret = db_ctdb_transaction_fetch_start(h);
299 if (ret != 0) {
300 talloc_free(h);
301 return -1;
304 talloc_set_destructor(h, db_ctdb_transaction_destructor);
306 ctx->transaction = h;
308 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
310 return 0;
316 fetch a record inside a transaction
318 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
319 TALLOC_CTX *mem_ctx,
320 TDB_DATA key, TDB_DATA *data)
322 struct db_ctdb_transaction_handle *h = db->transaction;
324 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
326 if (data->dptr != NULL) {
327 uint8_t *oldptr = (uint8_t *)data->dptr;
328 data->dsize -= sizeof(struct ctdb_ltdb_header);
329 if (data->dsize == 0) {
330 data->dptr = NULL;
331 } else {
332 data->dptr = (uint8 *)
333 talloc_memdup(
334 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
335 data->dsize);
337 SAFE_FREE(oldptr);
338 if (data->dptr == NULL && data->dsize != 0) {
339 return -1;
343 if (!h->in_replay) {
344 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
345 if (h->m_all == NULL) {
346 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
347 data->dsize = 0;
348 talloc_free(data->dptr);
349 return -1;
353 return 0;
357 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
358 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
360 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
361 TALLOC_CTX *mem_ctx,
362 TDB_DATA key)
364 struct db_record *result;
365 TDB_DATA ctdb_data;
367 if (!(result = talloc(mem_ctx, struct db_record))) {
368 DEBUG(0, ("talloc failed\n"));
369 return NULL;
372 result->private_data = ctx->transaction;
374 result->key.dsize = key.dsize;
375 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
376 if (result->key.dptr == NULL) {
377 DEBUG(0, ("talloc failed\n"));
378 TALLOC_FREE(result);
379 return NULL;
382 result->store = db_ctdb_store_transaction;
383 result->delete_rec = db_ctdb_delete_transaction;
385 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
386 if (ctdb_data.dptr == NULL) {
387 /* create the record */
388 result->value = tdb_null;
389 return result;
392 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
393 result->value.dptr = NULL;
395 if ((result->value.dsize != 0)
396 && !(result->value.dptr = (uint8 *)talloc_memdup(
397 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
398 result->value.dsize))) {
399 DEBUG(0, ("talloc failed\n"));
400 TALLOC_FREE(result);
403 SAFE_FREE(ctdb_data.dptr);
405 return result;
408 static int db_ctdb_record_destructor(struct db_record *rec)
410 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
411 rec->private_data, struct db_ctdb_transaction_handle);
412 int ret = h->ctx->db->transaction_commit(h->ctx->db);
413 if (ret != 0) {
414 DEBUG(0,(__location__ " transaction_commit failed\n"));
416 return 0;
420 auto-create a transaction for persistent databases
422 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
423 TALLOC_CTX *mem_ctx,
424 TDB_DATA key)
426 int res;
427 struct db_record *rec;
429 res = db_ctdb_transaction_start(ctx->db);
430 if (res == -1) {
431 return NULL;
434 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
435 if (rec == NULL) {
436 ctx->db->transaction_cancel(ctx->db);
437 return NULL;
440 /* destroy this transaction when we release the lock */
441 talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
442 return rec;
447 stores a record inside a transaction
449 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
450 TDB_DATA key, TDB_DATA data)
452 TALLOC_CTX *tmp_ctx = talloc_new(h);
453 int ret;
454 TDB_DATA rec;
455 struct ctdb_ltdb_header header;
457 /* we need the header so we can update the RSN */
458 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
459 if (rec.dptr == NULL) {
460 /* the record doesn't exist - create one with us as dmaster.
461 This is only safe because we are in a transaction and this
462 is a persistent database */
463 ZERO_STRUCT(header);
464 header.dmaster = get_my_vnn();
465 } else {
466 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
467 rec.dsize -= sizeof(struct ctdb_ltdb_header);
468 /* a special case, we are writing the same data that is there now */
469 if (data.dsize == rec.dsize &&
470 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
471 SAFE_FREE(rec.dptr);
472 talloc_free(tmp_ctx);
473 return 0;
475 SAFE_FREE(rec.dptr);
478 header.rsn++;
480 if (!h->in_replay) {
481 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
482 if (h->m_all == NULL) {
483 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
484 talloc_free(tmp_ctx);
485 return -1;
489 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
490 if (h->m_write == NULL) {
491 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
492 talloc_free(tmp_ctx);
493 return -1;
496 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
497 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
498 if (rec.dptr == NULL) {
499 DEBUG(0,(__location__ " Failed to alloc record\n"));
500 talloc_free(tmp_ctx);
501 return -1;
503 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
504 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
506 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
508 talloc_free(tmp_ctx);
510 return ret;
515 a record store inside a transaction
517 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
519 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
520 rec->private_data, struct db_ctdb_transaction_handle);
521 int ret;
523 ret = db_ctdb_transaction_store(h, rec->key, data);
524 if (ret != 0) {
525 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
527 return NT_STATUS_OK;
531 a record delete inside a transaction
533 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
535 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
536 rec->private_data, struct db_ctdb_transaction_handle);
537 int ret;
539 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
540 if (ret != 0) {
541 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
543 return NT_STATUS_OK;
548 replay a transaction
550 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
552 int ret, i;
553 struct ctdb_rec_data *rec = NULL;
555 h->in_replay = true;
556 talloc_free(h->m_write);
557 h->m_write = NULL;
559 ret = db_ctdb_transaction_fetch_start(h);
560 if (ret != 0) {
561 return ret;
564 for (i=0;i<h->m_all->count;i++) {
565 TDB_DATA key, data;
567 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
568 if (rec == NULL) {
569 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
570 goto failed;
573 if (rec->reqid == 0) {
574 /* its a store */
575 if (db_ctdb_transaction_store(h, key, data) != 0) {
576 goto failed;
578 } else {
579 TDB_DATA data2;
580 TALLOC_CTX *tmp_ctx = talloc_new(h);
582 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
583 talloc_free(tmp_ctx);
584 goto failed;
586 if (data2.dsize != data.dsize ||
587 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
588 /* the record has changed on us - we have to give up */
589 talloc_free(tmp_ctx);
590 goto failed;
592 talloc_free(tmp_ctx);
596 return 0;
598 failed:
599 tdb_transaction_cancel(h->ctx->wtdb->tdb);
600 return -1;
605 commit a transaction
607 static int db_ctdb_transaction_commit(struct db_context *db)
609 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
610 struct db_ctdb_ctx);
611 NTSTATUS rets;
612 int ret;
613 int status;
614 int retries = 0;
615 struct db_ctdb_transaction_handle *h = ctx->transaction;
616 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
618 if (h == NULL) {
619 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
620 return -1;
623 if (h->nested_cancel) {
624 db->transaction_cancel(db);
625 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
626 return -1;
629 if (h->nesting != 0) {
630 h->nesting--;
631 return 0;
634 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
636 talloc_set_destructor(h, NULL);
638 /* our commit strategy is quite complex.
640 - we first try to commit the changes to all other nodes
642 - if that works, then we commit locally and we are done
644 - if a commit on another node fails, then we need to cancel
645 the transaction, then restart the transaction (thus
646 opening a window of time for a pending recovery to
647 complete), then replay the transaction, checking all the
648 reads and writes (checking that reads give the same data,
649 and writes succeed). Then we retry the transaction to the
650 other nodes
653 again:
654 if (h->m_write == NULL) {
655 /* no changes were made, potentially after a retry */
656 tdb_transaction_cancel(h->ctx->wtdb->tdb);
657 talloc_free(h);
658 ctx->transaction = NULL;
659 return 0;
662 /* tell ctdbd to commit to the other nodes */
663 rets = ctdbd_control_local(messaging_ctdbd_connection(),
664 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
665 h->ctx->db_id, 0,
666 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
667 if (!NT_STATUS_IS_OK(rets) || status != 0) {
668 tdb_transaction_cancel(h->ctx->wtdb->tdb);
669 sleep(1);
671 if (!NT_STATUS_IS_OK(rets)) {
672 failure_control = CTDB_CONTROL_TRANS2_ERROR;
673 } else {
674 /* work out what error code we will give if we
675 have to fail the operation */
676 switch ((enum ctdb_trans2_commit_error)status) {
677 case CTDB_TRANS2_COMMIT_SUCCESS:
678 case CTDB_TRANS2_COMMIT_SOMEFAIL:
679 case CTDB_TRANS2_COMMIT_TIMEOUT:
680 failure_control = CTDB_CONTROL_TRANS2_ERROR;
681 break;
682 case CTDB_TRANS2_COMMIT_ALLFAIL:
683 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
684 break;
688 if (++retries == 5) {
689 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
690 h->ctx->db_id, retries, (unsigned)failure_control));
691 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
692 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
693 tdb_null, NULL, NULL, NULL);
694 h->ctx->transaction = NULL;
695 talloc_free(h);
696 ctx->transaction = NULL;
697 return -1;
700 if (ctdb_replay_transaction(h) != 0) {
701 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
702 (unsigned)failure_control));
703 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
704 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
705 tdb_null, NULL, NULL, NULL);
706 h->ctx->transaction = NULL;
707 talloc_free(h);
708 ctx->transaction = NULL;
709 return -1;
711 goto again;
712 } else {
713 failure_control = CTDB_CONTROL_TRANS2_ERROR;
716 /* do the real commit locally */
717 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
718 if (ret != 0) {
719 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
720 (unsigned)failure_control));
721 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
722 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
723 h->ctx->transaction = NULL;
724 talloc_free(h);
725 return ret;
728 /* tell ctdbd that we are finished with our local commit */
729 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
730 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
731 tdb_null, NULL, NULL, NULL);
732 h->ctx->transaction = NULL;
733 talloc_free(h);
734 return 0;
739 cancel a transaction
741 static int db_ctdb_transaction_cancel(struct db_context *db)
743 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
744 struct db_ctdb_ctx);
745 struct db_ctdb_transaction_handle *h = ctx->transaction;
747 if (h == NULL) {
748 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
749 return -1;
752 if (h->nesting != 0) {
753 h->nesting--;
754 h->nested_cancel = true;
755 return 0;
758 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
760 ctx->transaction = NULL;
761 talloc_free(h);
762 return 0;
766 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
768 struct db_ctdb_rec *crec = talloc_get_type_abort(
769 rec->private_data, struct db_ctdb_rec);
770 TDB_DATA cdata;
771 int ret;
773 cdata.dsize = sizeof(crec->header) + data.dsize;
775 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
776 return NT_STATUS_NO_MEMORY;
779 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
780 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
782 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
784 SAFE_FREE(cdata.dptr);
786 return (ret == 0) ? NT_STATUS_OK
787 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
792 static NTSTATUS db_ctdb_delete(struct db_record *rec)
794 TDB_DATA data;
797 * We have to store the header with empty data. TODO: Fix the
798 * tdb-level cleanup
801 ZERO_STRUCT(data);
803 return db_ctdb_store(rec, data, 0);
807 static int db_ctdb_record_destr(struct db_record* data)
809 struct db_ctdb_rec *crec = talloc_get_type_abort(
810 data->private_data, struct db_ctdb_rec);
812 DEBUG(10, (DEBUGLEVEL > 10
813 ? "Unlocking db %u key %s\n"
814 : "Unlocking db %u key %.20s\n",
815 (int)crec->ctdb_ctx->db_id,
816 hex_encode(data, (unsigned char *)data->key.dptr,
817 data->key.dsize)));
819 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
820 DEBUG(0, ("tdb_chainunlock failed\n"));
821 return -1;
824 return 0;
827 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
828 TALLOC_CTX *mem_ctx,
829 TDB_DATA key,
830 bool persistent)
832 struct db_record *result;
833 struct db_ctdb_rec *crec;
834 NTSTATUS status;
835 TDB_DATA ctdb_data;
836 int migrate_attempts = 0;
838 if (!(result = talloc(mem_ctx, struct db_record))) {
839 DEBUG(0, ("talloc failed\n"));
840 return NULL;
843 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
844 DEBUG(0, ("talloc failed\n"));
845 TALLOC_FREE(result);
846 return NULL;
849 result->private_data = (void *)crec;
850 crec->ctdb_ctx = ctx;
852 result->key.dsize = key.dsize;
853 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
854 if (result->key.dptr == NULL) {
855 DEBUG(0, ("talloc failed\n"));
856 TALLOC_FREE(result);
857 return NULL;
861 * Do a blocking lock on the record
863 again:
865 if (DEBUGLEVEL >= 10) {
866 char *keystr = hex_encode(result, key.dptr, key.dsize);
867 DEBUG(10, (DEBUGLEVEL > 10
868 ? "Locking db %u key %s\n"
869 : "Locking db %u key %.20s\n",
870 (int)crec->ctdb_ctx->db_id, keystr));
871 TALLOC_FREE(keystr);
874 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
875 DEBUG(3, ("tdb_chainlock failed\n"));
876 TALLOC_FREE(result);
877 return NULL;
880 result->store = db_ctdb_store;
881 result->delete_rec = db_ctdb_delete;
882 talloc_set_destructor(result, db_ctdb_record_destr);
884 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
887 * See if we have a valid record and we are the dmaster. If so, we can
888 * take the shortcut and just return it.
891 if ((ctdb_data.dptr == NULL) ||
892 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
893 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
894 #if 0
895 || (random() % 2 != 0)
896 #endif
898 SAFE_FREE(ctdb_data.dptr);
899 tdb_chainunlock(ctx->wtdb->tdb, key);
900 talloc_set_destructor(result, NULL);
902 migrate_attempts += 1;
904 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
905 ctdb_data.dptr, ctdb_data.dptr ?
906 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
907 get_my_vnn()));
909 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
910 if (!NT_STATUS_IS_OK(status)) {
911 DEBUG(5, ("ctdb_migrate failed: %s\n",
912 nt_errstr(status)));
913 TALLOC_FREE(result);
914 return NULL;
916 /* now its migrated, try again */
917 goto again;
920 if (migrate_attempts > 10) {
921 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
922 migrate_attempts));
925 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
927 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
928 result->value.dptr = NULL;
930 if ((result->value.dsize != 0)
931 && !(result->value.dptr = (uint8 *)talloc_memdup(
932 result, ctdb_data.dptr + sizeof(crec->header),
933 result->value.dsize))) {
934 DEBUG(0, ("talloc failed\n"));
935 TALLOC_FREE(result);
938 SAFE_FREE(ctdb_data.dptr);
940 return result;
943 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
944 TALLOC_CTX *mem_ctx,
945 TDB_DATA key)
947 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
948 struct db_ctdb_ctx);
950 if (ctx->transaction != NULL) {
951 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
954 if (db->persistent) {
955 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
958 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
962 fetch (unlocked, no migration) operation on ctdb
964 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
965 TDB_DATA key, TDB_DATA *data)
967 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
968 struct db_ctdb_ctx);
969 NTSTATUS status;
970 TDB_DATA ctdb_data;
972 if (ctx->transaction) {
973 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
976 /* try a direct fetch */
977 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
980 * See if we have a valid record and we are the dmaster. If so, we can
981 * take the shortcut and just return it.
982 * we bypass the dmaster check for persistent databases
984 if ((ctdb_data.dptr != NULL) &&
985 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
986 (db->persistent ||
987 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
988 /* we are the dmaster - avoid the ctdb protocol op */
990 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
991 if (data->dsize == 0) {
992 SAFE_FREE(ctdb_data.dptr);
993 data->dptr = NULL;
994 return 0;
997 data->dptr = (uint8 *)talloc_memdup(
998 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
999 data->dsize);
1001 SAFE_FREE(ctdb_data.dptr);
1003 if (data->dptr == NULL) {
1004 return -1;
1006 return 0;
1009 SAFE_FREE(ctdb_data.dptr);
1011 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1012 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1013 if (!NT_STATUS_IS_OK(status)) {
1014 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1015 return -1;
1018 return 0;
1021 struct traverse_state {
1022 struct db_context *db;
1023 int (*fn)(struct db_record *rec, void *private_data);
1024 void *private_data;
1027 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1029 struct traverse_state *state = (struct traverse_state *)private_data;
1030 struct db_record *rec;
1031 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1032 /* we have to give them a locked record to prevent races */
1033 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1034 if (rec && rec->value.dsize > 0) {
1035 state->fn(rec, state->private_data);
1037 talloc_free(tmp_ctx);
1040 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1041 void *private_data)
1043 struct traverse_state *state = (struct traverse_state *)private_data;
1044 struct db_record *rec;
1045 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1046 int ret = 0;
1047 /* we have to give them a locked record to prevent races */
1048 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1049 if (rec && rec->value.dsize > 0) {
1050 ret = state->fn(rec, state->private_data);
1052 talloc_free(tmp_ctx);
1053 return ret;
1056 static int db_ctdb_traverse(struct db_context *db,
1057 int (*fn)(struct db_record *rec,
1058 void *private_data),
1059 void *private_data)
1061 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1062 struct db_ctdb_ctx);
1063 struct traverse_state state;
1065 state.db = db;
1066 state.fn = fn;
1067 state.private_data = private_data;
1069 if (db->persistent) {
1070 /* for persistent databases we don't need to do a ctdb traverse,
1071 we can do a faster local traverse */
1072 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1076 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1077 return 0;
1080 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1082 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1085 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1087 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1090 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1092 struct traverse_state *state = (struct traverse_state *)private_data;
1093 struct db_record rec;
1094 rec.key = key;
1095 rec.value = data;
1096 rec.store = db_ctdb_store_deny;
1097 rec.delete_rec = db_ctdb_delete_deny;
1098 rec.private_data = state->db;
1099 state->fn(&rec, state->private_data);
1102 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1103 void *private_data)
1105 struct traverse_state *state = (struct traverse_state *)private_data;
1106 struct db_record rec;
1107 rec.key = kbuf;
1108 rec.value = dbuf;
1109 rec.store = db_ctdb_store_deny;
1110 rec.delete_rec = db_ctdb_delete_deny;
1111 rec.private_data = state->db;
1113 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1114 /* a deleted record */
1115 return 0;
1117 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1118 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1120 return state->fn(&rec, state->private_data);
1123 static int db_ctdb_traverse_read(struct db_context *db,
1124 int (*fn)(struct db_record *rec,
1125 void *private_data),
1126 void *private_data)
1128 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1129 struct db_ctdb_ctx);
1130 struct traverse_state state;
1132 state.db = db;
1133 state.fn = fn;
1134 state.private_data = private_data;
1136 if (db->persistent) {
1137 /* for persistent databases we don't need to do a ctdb traverse,
1138 we can do a faster local traverse */
1139 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1142 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1143 return 0;
1146 static int db_ctdb_get_seqnum(struct db_context *db)
1148 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1149 struct db_ctdb_ctx);
1150 return tdb_get_seqnum(ctx->wtdb->tdb);
1153 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1154 const char *name,
1155 int hash_size, int tdb_flags,
1156 int open_flags, mode_t mode)
1158 struct db_context *result;
1159 struct db_ctdb_ctx *db_ctdb;
1160 char *db_path;
1162 if (!lp_clustering()) {
1163 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1164 return NULL;
1167 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1168 DEBUG(0, ("talloc failed\n"));
1169 TALLOC_FREE(result);
1170 return NULL;
1173 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1174 DEBUG(0, ("talloc failed\n"));
1175 TALLOC_FREE(result);
1176 return NULL;
1179 db_ctdb->transaction = NULL;
1180 db_ctdb->db = result;
1182 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1183 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1184 TALLOC_FREE(result);
1185 return NULL;
1188 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1190 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1192 /* only pass through specific flags */
1193 tdb_flags &= TDB_SEQNUM;
1195 /* honor permissions if user has specified O_CREAT */
1196 if (open_flags & O_CREAT) {
1197 chmod(db_path, mode);
1200 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1201 if (db_ctdb->wtdb == NULL) {
1202 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1203 TALLOC_FREE(result);
1204 return NULL;
1206 talloc_free(db_path);
1208 result->private_data = (void *)db_ctdb;
1209 result->fetch_locked = db_ctdb_fetch_locked;
1210 result->fetch = db_ctdb_fetch;
1211 result->traverse = db_ctdb_traverse;
1212 result->traverse_read = db_ctdb_traverse_read;
1213 result->get_seqnum = db_ctdb_get_seqnum;
1214 result->transaction_start = db_ctdb_transaction_start;
1215 result->transaction_commit = db_ctdb_transaction_commit;
1216 result->transaction_cancel = db_ctdb_transaction_cancel;
1218 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1219 name, db_ctdb->db_id));
1221 return result;
1223 #endif