s3:dbwrap_ctdb: reformat a comment slightly to enhance clearness.
[Samba/gebeck_regimport.git] / source3 / lib / dbwrap_ctdb.c
blob4dd9465c5f13fdd48066ad206accb0a52ee19d98
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
36 uint32_t nesting;
37 bool nested_cancel;
40 struct db_ctdb_ctx {
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
43 uint32 db_id;
44 struct db_ctdb_transaction_handle *transaction;
47 struct db_ctdb_rec {
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 TALLOC_CTX *mem_ctx,
54 TDB_DATA key,
55 bool persistent);
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
59 NTSTATUS status;
60 enum TDB_ERROR tret = tdb_error(tdb);
62 switch (tret) {
63 case TDB_ERR_EXISTS:
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 break;
66 case TDB_ERR_NOEXIST:
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 break;
69 default:
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71 break;
74 return status;
80 form a ctdb_rec_data record from a key/data pair
82 note that header may be NULL. If not NULL then it is included in the data portion
83 of the record
85 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
86 TDB_DATA key,
87 struct ctdb_ltdb_header *header,
88 TDB_DATA data)
90 size_t length;
91 struct ctdb_rec_data *d;
93 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
94 data.dsize + (header?sizeof(*header):0);
95 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
96 if (d == NULL) {
97 return NULL;
99 d->length = length;
100 d->reqid = reqid;
101 d->keylen = key.dsize;
102 memcpy(&d->data[0], key.dptr, key.dsize);
103 if (header) {
104 d->datalen = data.dsize + sizeof(*header);
105 memcpy(&d->data[key.dsize], header, sizeof(*header));
106 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
107 } else {
108 d->datalen = data.dsize;
109 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
111 return d;
115 /* helper function for marshalling multiple records */
116 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
117 struct ctdb_marshall_buffer *m,
118 uint64_t db_id,
119 uint32_t reqid,
120 TDB_DATA key,
121 struct ctdb_ltdb_header *header,
122 TDB_DATA data)
124 struct ctdb_rec_data *r;
125 size_t m_size, r_size;
126 struct ctdb_marshall_buffer *m2 = NULL;
128 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
129 if (r == NULL) {
130 talloc_free(m);
131 return NULL;
134 if (m == NULL) {
135 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
136 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
137 if (m == NULL) {
138 goto done;
140 m->db_id = db_id;
143 m_size = talloc_get_size(m);
144 r_size = talloc_get_size(r);
146 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
147 mem_ctx, m, m_size + r_size);
148 if (m2 == NULL) {
149 talloc_free(m);
150 goto done;
153 memcpy(m_size + (uint8_t *)m2, r, r_size);
155 m2->count++;
157 done:
158 talloc_free(r);
159 return m2;
162 /* we've finished marshalling, return a data blob with the marshalled records */
163 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
165 TDB_DATA data;
166 data.dptr = (uint8_t *)m;
167 data.dsize = talloc_get_size(m);
168 return data;
172 loop over a marshalling buffer
174 - pass r==NULL to start
175 - loop the number of times indicated by m->count
177 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
178 uint32_t *reqid,
179 struct ctdb_ltdb_header *header,
180 TDB_DATA *key, TDB_DATA *data)
182 if (r == NULL) {
183 r = (struct ctdb_rec_data *)&m->data[0];
184 } else {
185 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
188 if (reqid != NULL) {
189 *reqid = r->reqid;
192 if (key != NULL) {
193 key->dptr = &r->data[0];
194 key->dsize = r->keylen;
196 if (data != NULL) {
197 data->dptr = &r->data[r->keylen];
198 data->dsize = r->datalen;
199 if (header != NULL) {
200 data->dptr += sizeof(*header);
201 data->dsize -= sizeof(*header);
205 if (header != NULL) {
206 if (r->datalen < sizeof(*header)) {
207 return NULL;
209 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
212 return r;
218 * CTDB transaction destructor
220 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
222 tdb_transaction_cancel(h->ctx->wtdb->tdb);
223 return 0;
227 * start a transaction on a ctdb database:
228 * - lock the transaction lock key
229 * - start the tdb transaction
231 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
233 struct db_record *rh;
234 TDB_DATA key;
235 TALLOC_CTX *tmp_ctx;
236 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
237 int ret;
238 struct db_ctdb_ctx *ctx = h->ctx;
239 TDB_DATA data;
241 key.dptr = (uint8_t *)discard_const(keyname);
242 key.dsize = strlen(keyname);
244 again:
245 tmp_ctx = talloc_new(h);
247 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
248 if (rh == NULL) {
249 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
250 talloc_free(tmp_ctx);
251 return -1;
253 talloc_free(rh);
255 ret = tdb_transaction_start(ctx->wtdb->tdb);
256 if (ret != 0) {
257 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
258 talloc_free(tmp_ctx);
259 return -1;
262 data = tdb_fetch(ctx->wtdb->tdb, key);
263 if ((data.dptr == NULL) ||
264 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
265 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
266 SAFE_FREE(data.dptr);
267 tdb_transaction_cancel(ctx->wtdb->tdb);
268 talloc_free(tmp_ctx);
269 goto again;
272 SAFE_FREE(data.dptr);
273 talloc_free(tmp_ctx);
275 return 0;
280 * CTDB dbwrap API: transaction_start function
281 * starts a transaction on a persistent database
283 static int db_ctdb_transaction_start(struct db_context *db)
285 struct db_ctdb_transaction_handle *h;
286 int ret;
287 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
288 struct db_ctdb_ctx);
290 if (!db->persistent) {
291 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
292 ctx->db_id));
293 return -1;
296 if (ctx->transaction) {
297 ctx->transaction->nesting++;
298 return 0;
301 h = talloc_zero(db, struct db_ctdb_transaction_handle);
302 if (h == NULL) {
303 DEBUG(0,(__location__ " oom for transaction handle\n"));
304 return -1;
307 h->ctx = ctx;
309 ret = db_ctdb_transaction_fetch_start(h);
310 if (ret != 0) {
311 talloc_free(h);
312 return -1;
315 talloc_set_destructor(h, db_ctdb_transaction_destructor);
317 ctx->transaction = h;
319 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
321 return 0;
327 fetch a record inside a transaction
329 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
330 TALLOC_CTX *mem_ctx,
331 TDB_DATA key, TDB_DATA *data)
333 struct db_ctdb_transaction_handle *h = db->transaction;
335 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
337 if (data->dptr != NULL) {
338 uint8_t *oldptr = (uint8_t *)data->dptr;
339 data->dsize -= sizeof(struct ctdb_ltdb_header);
340 if (data->dsize == 0) {
341 data->dptr = NULL;
342 } else {
343 data->dptr = (uint8 *)
344 talloc_memdup(
345 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
346 data->dsize);
348 SAFE_FREE(oldptr);
349 if (data->dptr == NULL && data->dsize != 0) {
350 return -1;
354 if (!h->in_replay) {
355 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
356 if (h->m_all == NULL) {
357 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
358 data->dsize = 0;
359 talloc_free(data->dptr);
360 return -1;
364 return 0;
368 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
369 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
371 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
372 TALLOC_CTX *mem_ctx,
373 TDB_DATA key)
375 struct db_record *result;
376 TDB_DATA ctdb_data;
378 if (!(result = talloc(mem_ctx, struct db_record))) {
379 DEBUG(0, ("talloc failed\n"));
380 return NULL;
383 result->private_data = ctx->transaction;
385 result->key.dsize = key.dsize;
386 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
387 if (result->key.dptr == NULL) {
388 DEBUG(0, ("talloc failed\n"));
389 TALLOC_FREE(result);
390 return NULL;
393 result->store = db_ctdb_store_transaction;
394 result->delete_rec = db_ctdb_delete_transaction;
396 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
397 if (ctdb_data.dptr == NULL) {
398 /* create the record */
399 result->value = tdb_null;
400 return result;
403 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
404 result->value.dptr = NULL;
406 if ((result->value.dsize != 0)
407 && !(result->value.dptr = (uint8 *)talloc_memdup(
408 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
409 result->value.dsize))) {
410 DEBUG(0, ("talloc failed\n"));
411 TALLOC_FREE(result);
414 SAFE_FREE(ctdb_data.dptr);
416 return result;
419 static int db_ctdb_record_destructor(struct db_record **recp)
421 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
422 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
423 rec->private_data, struct db_ctdb_transaction_handle);
424 int ret = h->ctx->db->transaction_commit(h->ctx->db);
425 if (ret != 0) {
426 DEBUG(0,(__location__ " transaction_commit failed\n"));
428 return 0;
432 auto-create a transaction for persistent databases
434 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
435 TALLOC_CTX *mem_ctx,
436 TDB_DATA key)
438 int res;
439 struct db_record *rec, **recp;
441 res = db_ctdb_transaction_start(ctx->db);
442 if (res == -1) {
443 return NULL;
446 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
447 if (rec == NULL) {
448 ctx->db->transaction_cancel(ctx->db);
449 return NULL;
452 /* destroy this transaction when we release the lock */
453 recp = talloc(rec, struct db_record *);
454 if (recp == NULL) {
455 ctx->db->transaction_cancel(ctx->db);
456 talloc_free(rec);
457 return NULL;
459 *recp = rec;
460 talloc_set_destructor(recp, db_ctdb_record_destructor);
461 return rec;
466 stores a record inside a transaction
468 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
469 TDB_DATA key, TDB_DATA data)
471 TALLOC_CTX *tmp_ctx = talloc_new(h);
472 int ret;
473 TDB_DATA rec;
474 struct ctdb_ltdb_header header;
476 /* we need the header so we can update the RSN */
477 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
478 if (rec.dptr == NULL) {
479 /* the record doesn't exist - create one with us as dmaster.
480 This is only safe because we are in a transaction and this
481 is a persistent database */
482 ZERO_STRUCT(header);
483 } else {
484 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
485 rec.dsize -= sizeof(struct ctdb_ltdb_header);
486 /* a special case, we are writing the same data that is there now */
487 if (data.dsize == rec.dsize &&
488 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
489 SAFE_FREE(rec.dptr);
490 talloc_free(tmp_ctx);
491 return 0;
493 SAFE_FREE(rec.dptr);
496 header.dmaster = get_my_vnn();
497 header.rsn++;
499 if (!h->in_replay) {
500 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
501 if (h->m_all == NULL) {
502 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
503 talloc_free(tmp_ctx);
504 return -1;
508 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
509 if (h->m_write == NULL) {
510 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
511 talloc_free(tmp_ctx);
512 return -1;
515 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
516 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
517 if (rec.dptr == NULL) {
518 DEBUG(0,(__location__ " Failed to alloc record\n"));
519 talloc_free(tmp_ctx);
520 return -1;
522 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
523 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
525 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
527 talloc_free(tmp_ctx);
529 return ret;
534 a record store inside a transaction
536 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
538 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
539 rec->private_data, struct db_ctdb_transaction_handle);
540 int ret;
542 ret = db_ctdb_transaction_store(h, rec->key, data);
543 if (ret != 0) {
544 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
546 return NT_STATUS_OK;
550 a record delete inside a transaction
552 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
554 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
555 rec->private_data, struct db_ctdb_transaction_handle);
556 int ret;
558 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
559 if (ret != 0) {
560 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
562 return NT_STATUS_OK;
567 replay a transaction
569 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
571 int ret, i;
572 struct ctdb_rec_data *rec = NULL;
574 h->in_replay = true;
575 talloc_free(h->m_write);
576 h->m_write = NULL;
578 ret = db_ctdb_transaction_fetch_start(h);
579 if (ret != 0) {
580 return ret;
583 for (i=0;i<h->m_all->count;i++) {
584 TDB_DATA key, data;
586 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
587 if (rec == NULL) {
588 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
589 goto failed;
592 if (rec->reqid == 0) {
593 /* its a store */
594 if (db_ctdb_transaction_store(h, key, data) != 0) {
595 goto failed;
597 } else {
598 TDB_DATA data2;
599 TALLOC_CTX *tmp_ctx = talloc_new(h);
601 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
602 talloc_free(tmp_ctx);
603 goto failed;
605 if (data2.dsize != data.dsize ||
606 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
607 /* the record has changed on us - we have to give up */
608 talloc_free(tmp_ctx);
609 goto failed;
611 talloc_free(tmp_ctx);
615 return 0;
617 failed:
618 tdb_transaction_cancel(h->ctx->wtdb->tdb);
619 return -1;
624 commit a transaction
626 static int db_ctdb_transaction_commit(struct db_context *db)
628 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
629 struct db_ctdb_ctx);
630 NTSTATUS rets;
631 int ret;
632 int status;
633 int retries = 0;
634 struct db_ctdb_transaction_handle *h = ctx->transaction;
635 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
637 if (h == NULL) {
638 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
639 return -1;
642 if (h->nested_cancel) {
643 db->transaction_cancel(db);
644 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
645 return -1;
648 if (h->nesting != 0) {
649 h->nesting--;
650 return 0;
653 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
655 talloc_set_destructor(h, NULL);
657 /* our commit strategy is quite complex.
659 - we first try to commit the changes to all other nodes
661 - if that works, then we commit locally and we are done
663 - if a commit on another node fails, then we need to cancel
664 the transaction, then restart the transaction (thus
665 opening a window of time for a pending recovery to
666 complete), then replay the transaction, checking all the
667 reads and writes (checking that reads give the same data,
668 and writes succeed). Then we retry the transaction to the
669 other nodes
672 again:
673 if (h->m_write == NULL) {
674 /* no changes were made, potentially after a retry */
675 tdb_transaction_cancel(h->ctx->wtdb->tdb);
676 talloc_free(h);
677 ctx->transaction = NULL;
678 return 0;
681 /* tell ctdbd to commit to the other nodes */
682 rets = ctdbd_control_local(messaging_ctdbd_connection(),
683 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
684 h->ctx->db_id, 0,
685 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
686 if (!NT_STATUS_IS_OK(rets) || status != 0) {
687 tdb_transaction_cancel(h->ctx->wtdb->tdb);
688 sleep(1);
690 if (!NT_STATUS_IS_OK(rets)) {
691 failure_control = CTDB_CONTROL_TRANS2_ERROR;
692 } else {
693 /* work out what error code we will give if we
694 have to fail the operation */
695 switch ((enum ctdb_trans2_commit_error)status) {
696 case CTDB_TRANS2_COMMIT_SUCCESS:
697 case CTDB_TRANS2_COMMIT_SOMEFAIL:
698 case CTDB_TRANS2_COMMIT_TIMEOUT:
699 failure_control = CTDB_CONTROL_TRANS2_ERROR;
700 break;
701 case CTDB_TRANS2_COMMIT_ALLFAIL:
702 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
703 break;
707 if (++retries == 5) {
708 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
709 h->ctx->db_id, retries, (unsigned)failure_control));
710 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
711 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
712 tdb_null, NULL, NULL, NULL);
713 h->ctx->transaction = NULL;
714 talloc_free(h);
715 ctx->transaction = NULL;
716 return -1;
719 if (ctdb_replay_transaction(h) != 0) {
720 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
721 (unsigned)failure_control));
722 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
723 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
724 tdb_null, NULL, NULL, NULL);
725 h->ctx->transaction = NULL;
726 talloc_free(h);
727 ctx->transaction = NULL;
728 return -1;
730 goto again;
731 } else {
732 failure_control = CTDB_CONTROL_TRANS2_ERROR;
735 /* do the real commit locally */
736 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
737 if (ret != 0) {
738 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
739 (unsigned)failure_control));
740 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
741 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
742 h->ctx->transaction = NULL;
743 talloc_free(h);
744 return ret;
747 /* tell ctdbd that we are finished with our local commit */
748 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
749 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
750 tdb_null, NULL, NULL, NULL);
751 h->ctx->transaction = NULL;
752 talloc_free(h);
753 return 0;
758 cancel a transaction
760 static int db_ctdb_transaction_cancel(struct db_context *db)
762 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
763 struct db_ctdb_ctx);
764 struct db_ctdb_transaction_handle *h = ctx->transaction;
766 if (h == NULL) {
767 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
768 return -1;
771 if (h->nesting != 0) {
772 h->nesting--;
773 h->nested_cancel = true;
774 return 0;
777 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
779 ctx->transaction = NULL;
780 talloc_free(h);
781 return 0;
785 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
787 struct db_ctdb_rec *crec = talloc_get_type_abort(
788 rec->private_data, struct db_ctdb_rec);
789 TDB_DATA cdata;
790 int ret;
792 cdata.dsize = sizeof(crec->header) + data.dsize;
794 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
795 return NT_STATUS_NO_MEMORY;
798 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
799 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
801 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
803 SAFE_FREE(cdata.dptr);
805 return (ret == 0) ? NT_STATUS_OK
806 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
811 static NTSTATUS db_ctdb_delete(struct db_record *rec)
813 TDB_DATA data;
816 * We have to store the header with empty data. TODO: Fix the
817 * tdb-level cleanup
820 ZERO_STRUCT(data);
822 return db_ctdb_store(rec, data, 0);
826 static int db_ctdb_record_destr(struct db_record* data)
828 struct db_ctdb_rec *crec = talloc_get_type_abort(
829 data->private_data, struct db_ctdb_rec);
831 DEBUG(10, (DEBUGLEVEL > 10
832 ? "Unlocking db %u key %s\n"
833 : "Unlocking db %u key %.20s\n",
834 (int)crec->ctdb_ctx->db_id,
835 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
836 data->key.dsize)));
838 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
839 DEBUG(0, ("tdb_chainunlock failed\n"));
840 return -1;
843 return 0;
846 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
847 TALLOC_CTX *mem_ctx,
848 TDB_DATA key,
849 bool persistent)
851 struct db_record *result;
852 struct db_ctdb_rec *crec;
853 NTSTATUS status;
854 TDB_DATA ctdb_data;
855 int migrate_attempts = 0;
857 if (!(result = talloc(mem_ctx, struct db_record))) {
858 DEBUG(0, ("talloc failed\n"));
859 return NULL;
862 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
863 DEBUG(0, ("talloc failed\n"));
864 TALLOC_FREE(result);
865 return NULL;
868 result->private_data = (void *)crec;
869 crec->ctdb_ctx = ctx;
871 result->key.dsize = key.dsize;
872 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
873 if (result->key.dptr == NULL) {
874 DEBUG(0, ("talloc failed\n"));
875 TALLOC_FREE(result);
876 return NULL;
880 * Do a blocking lock on the record
882 again:
884 if (DEBUGLEVEL >= 10) {
885 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
886 DEBUG(10, (DEBUGLEVEL > 10
887 ? "Locking db %u key %s\n"
888 : "Locking db %u key %.20s\n",
889 (int)crec->ctdb_ctx->db_id, keystr));
890 TALLOC_FREE(keystr);
893 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
894 DEBUG(3, ("tdb_chainlock failed\n"));
895 TALLOC_FREE(result);
896 return NULL;
899 result->store = db_ctdb_store;
900 result->delete_rec = db_ctdb_delete;
901 talloc_set_destructor(result, db_ctdb_record_destr);
903 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
906 * See if we have a valid record and we are the dmaster. If so, we can
907 * take the shortcut and just return it.
910 if ((ctdb_data.dptr == NULL) ||
911 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
912 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
913 #if 0
914 || (random() % 2 != 0)
915 #endif
917 SAFE_FREE(ctdb_data.dptr);
918 tdb_chainunlock(ctx->wtdb->tdb, key);
919 talloc_set_destructor(result, NULL);
921 migrate_attempts += 1;
923 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
924 ctdb_data.dptr, ctdb_data.dptr ?
925 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
926 get_my_vnn()));
928 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
929 if (!NT_STATUS_IS_OK(status)) {
930 DEBUG(5, ("ctdb_migrate failed: %s\n",
931 nt_errstr(status)));
932 TALLOC_FREE(result);
933 return NULL;
935 /* now its migrated, try again */
936 goto again;
939 if (migrate_attempts > 10) {
940 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
941 migrate_attempts));
944 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
946 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
947 result->value.dptr = NULL;
949 if ((result->value.dsize != 0)
950 && !(result->value.dptr = (uint8 *)talloc_memdup(
951 result, ctdb_data.dptr + sizeof(crec->header),
952 result->value.dsize))) {
953 DEBUG(0, ("talloc failed\n"));
954 TALLOC_FREE(result);
957 SAFE_FREE(ctdb_data.dptr);
959 return result;
962 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
963 TALLOC_CTX *mem_ctx,
964 TDB_DATA key)
966 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
967 struct db_ctdb_ctx);
969 if (ctx->transaction != NULL) {
970 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
973 if (db->persistent) {
974 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
977 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
981 fetch (unlocked, no migration) operation on ctdb
983 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
984 TDB_DATA key, TDB_DATA *data)
986 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
987 struct db_ctdb_ctx);
988 NTSTATUS status;
989 TDB_DATA ctdb_data;
991 if (ctx->transaction) {
992 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
995 /* try a direct fetch */
996 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
999 * See if we have a valid record and we are the dmaster. If so, we can
1000 * take the shortcut and just return it.
1001 * we bypass the dmaster check for persistent databases
1003 if ((ctdb_data.dptr != NULL) &&
1004 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1005 (db->persistent ||
1006 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1007 /* we are the dmaster - avoid the ctdb protocol op */
1009 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1010 if (data->dsize == 0) {
1011 SAFE_FREE(ctdb_data.dptr);
1012 data->dptr = NULL;
1013 return 0;
1016 data->dptr = (uint8 *)talloc_memdup(
1017 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1018 data->dsize);
1020 SAFE_FREE(ctdb_data.dptr);
1022 if (data->dptr == NULL) {
1023 return -1;
1025 return 0;
1028 SAFE_FREE(ctdb_data.dptr);
1030 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1031 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1032 if (!NT_STATUS_IS_OK(status)) {
1033 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1034 return -1;
1037 return 0;
1040 struct traverse_state {
1041 struct db_context *db;
1042 int (*fn)(struct db_record *rec, void *private_data);
1043 void *private_data;
1046 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1048 struct traverse_state *state = (struct traverse_state *)private_data;
1049 struct db_record *rec;
1050 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1051 /* we have to give them a locked record to prevent races */
1052 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1053 if (rec && rec->value.dsize > 0) {
1054 state->fn(rec, state->private_data);
1056 talloc_free(tmp_ctx);
1059 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1060 void *private_data)
1062 struct traverse_state *state = (struct traverse_state *)private_data;
1063 struct db_record *rec;
1064 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1065 int ret = 0;
1066 /* we have to give them a locked record to prevent races */
1067 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1068 if (rec && rec->value.dsize > 0) {
1069 ret = state->fn(rec, state->private_data);
1071 talloc_free(tmp_ctx);
1072 return ret;
1075 static int db_ctdb_traverse(struct db_context *db,
1076 int (*fn)(struct db_record *rec,
1077 void *private_data),
1078 void *private_data)
1080 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1081 struct db_ctdb_ctx);
1082 struct traverse_state state;
1084 state.db = db;
1085 state.fn = fn;
1086 state.private_data = private_data;
1088 if (db->persistent) {
1089 /* for persistent databases we don't need to do a ctdb traverse,
1090 we can do a faster local traverse */
1091 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1095 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1096 return 0;
1099 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1101 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1104 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1106 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1109 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1111 struct traverse_state *state = (struct traverse_state *)private_data;
1112 struct db_record rec;
1113 rec.key = key;
1114 rec.value = data;
1115 rec.store = db_ctdb_store_deny;
1116 rec.delete_rec = db_ctdb_delete_deny;
1117 rec.private_data = state->db;
1118 state->fn(&rec, state->private_data);
1121 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1122 void *private_data)
1124 struct traverse_state *state = (struct traverse_state *)private_data;
1125 struct db_record rec;
1126 rec.key = kbuf;
1127 rec.value = dbuf;
1128 rec.store = db_ctdb_store_deny;
1129 rec.delete_rec = db_ctdb_delete_deny;
1130 rec.private_data = state->db;
1132 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1133 /* a deleted record */
1134 return 0;
1136 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1137 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1139 return state->fn(&rec, state->private_data);
1142 static int db_ctdb_traverse_read(struct db_context *db,
1143 int (*fn)(struct db_record *rec,
1144 void *private_data),
1145 void *private_data)
1147 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1148 struct db_ctdb_ctx);
1149 struct traverse_state state;
1151 state.db = db;
1152 state.fn = fn;
1153 state.private_data = private_data;
1155 if (db->persistent) {
1156 /* for persistent databases we don't need to do a ctdb traverse,
1157 we can do a faster local traverse */
1158 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1161 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1162 return 0;
1165 static int db_ctdb_get_seqnum(struct db_context *db)
1167 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1168 struct db_ctdb_ctx);
1169 return tdb_get_seqnum(ctx->wtdb->tdb);
1172 static int db_ctdb_get_flags(struct db_context *db)
1174 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1175 struct db_ctdb_ctx);
1176 return tdb_get_flags(ctx->wtdb->tdb);
1179 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1180 const char *name,
1181 int hash_size, int tdb_flags,
1182 int open_flags, mode_t mode)
1184 struct db_context *result;
1185 struct db_ctdb_ctx *db_ctdb;
1186 char *db_path;
1188 if (!lp_clustering()) {
1189 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1190 return NULL;
1193 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1194 DEBUG(0, ("talloc failed\n"));
1195 TALLOC_FREE(result);
1196 return NULL;
1199 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1200 DEBUG(0, ("talloc failed\n"));
1201 TALLOC_FREE(result);
1202 return NULL;
1205 db_ctdb->transaction = NULL;
1206 db_ctdb->db = result;
1208 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1209 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1210 TALLOC_FREE(result);
1211 return NULL;
1214 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1216 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1218 /* only pass through specific flags */
1219 tdb_flags &= TDB_SEQNUM;
1221 /* honor permissions if user has specified O_CREAT */
1222 if (open_flags & O_CREAT) {
1223 chmod(db_path, mode);
1226 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1227 if (db_ctdb->wtdb == NULL) {
1228 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1229 TALLOC_FREE(result);
1230 return NULL;
1232 talloc_free(db_path);
1234 result->private_data = (void *)db_ctdb;
1235 result->fetch_locked = db_ctdb_fetch_locked;
1236 result->fetch = db_ctdb_fetch;
1237 result->traverse = db_ctdb_traverse;
1238 result->traverse_read = db_ctdb_traverse_read;
1239 result->get_seqnum = db_ctdb_get_seqnum;
1240 result->get_flags = db_ctdb_get_flags;
1241 result->transaction_start = db_ctdb_transaction_start;
1242 result->transaction_commit = db_ctdb_transaction_commit;
1243 result->transaction_cancel = db_ctdb_transaction_cancel;
1245 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1246 name, db_ctdb->db_id));
1248 return result;
1250 #endif