s3:dbrwap_ctdb: add a function db_ctdb_ltdb_store()
[Samba/bb.git] / source3 / lib / dbwrap_ctdb.c
blob1d207e7abf52bf88f7c093502430897b62a5be0b
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
30 * we store the reads and writes done under a transaction:
31 * - one list stores both reads and writes (m_all),
32 * - the other just writes (m_write)
34 struct ctdb_marshall_buffer *m_all;
35 struct ctdb_marshall_buffer *m_write;
36 uint32_t nesting;
37 bool nested_cancel;
40 struct db_ctdb_ctx {
41 struct db_context *db;
42 struct tdb_wrap *wtdb;
43 uint32 db_id;
44 struct db_ctdb_transaction_handle *transaction;
47 struct db_ctdb_rec {
48 struct db_ctdb_ctx *ctdb_ctx;
49 struct ctdb_ltdb_header header;
52 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
53 TALLOC_CTX *mem_ctx,
54 TDB_DATA key,
55 bool persistent);
57 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
59 NTSTATUS status;
60 enum TDB_ERROR tret = tdb_error(tdb);
62 switch (tret) {
63 case TDB_ERR_EXISTS:
64 status = NT_STATUS_OBJECT_NAME_COLLISION;
65 break;
66 case TDB_ERR_NOEXIST:
67 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
68 break;
69 default:
70 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
71 break;
74 return status;
79 * Store a record together with the ctdb record header
80 * in the local copy of the database.
82 static NTSTATUS db_ctdb_ltdb_store(struct db_ctdb_ctx *db,
83 TDB_DATA key,
84 struct ctdb_ltdb_header *header,
85 TDB_DATA data)
87 TALLOC_CTX *tmp_ctx = talloc_stackframe();
88 TDB_DATA rec;
89 int ret;
91 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
92 rec.dptr = (uint8_t *)talloc_size(tmp_ctx, rec.dsize);
94 if (rec.dptr == NULL) {
95 talloc_free(tmp_ctx);
96 return NT_STATUS_NO_MEMORY;
99 memcpy(rec.dptr, header, sizeof(struct ctdb_ltdb_header));
100 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
102 ret = tdb_store(db->wtdb->tdb, key, rec, TDB_REPLACE);
104 talloc_free(tmp_ctx);
106 return (ret == 0) ? NT_STATUS_OK
107 : tdb_error_to_ntstatus(db->wtdb->tdb);
112 form a ctdb_rec_data record from a key/data pair
114 note that header may be NULL. If not NULL then it is included in the data portion
115 of the record
117 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
118 TDB_DATA key,
119 struct ctdb_ltdb_header *header,
120 TDB_DATA data)
122 size_t length;
123 struct ctdb_rec_data *d;
125 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
126 data.dsize + (header?sizeof(*header):0);
127 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
128 if (d == NULL) {
129 return NULL;
131 d->length = length;
132 d->reqid = reqid;
133 d->keylen = key.dsize;
134 memcpy(&d->data[0], key.dptr, key.dsize);
135 if (header) {
136 d->datalen = data.dsize + sizeof(*header);
137 memcpy(&d->data[key.dsize], header, sizeof(*header));
138 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
139 } else {
140 d->datalen = data.dsize;
141 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
143 return d;
147 /* helper function for marshalling multiple records */
148 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
149 struct ctdb_marshall_buffer *m,
150 uint64_t db_id,
151 uint32_t reqid,
152 TDB_DATA key,
153 struct ctdb_ltdb_header *header,
154 TDB_DATA data)
156 struct ctdb_rec_data *r;
157 size_t m_size, r_size;
158 struct ctdb_marshall_buffer *m2 = NULL;
160 r = db_ctdb_marshall_record(talloc_tos(), reqid, key, header, data);
161 if (r == NULL) {
162 talloc_free(m);
163 return NULL;
166 if (m == NULL) {
167 m = (struct ctdb_marshall_buffer *)talloc_zero_size(
168 mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
169 if (m == NULL) {
170 goto done;
172 m->db_id = db_id;
175 m_size = talloc_get_size(m);
176 r_size = talloc_get_size(r);
178 m2 = (struct ctdb_marshall_buffer *)talloc_realloc_size(
179 mem_ctx, m, m_size + r_size);
180 if (m2 == NULL) {
181 talloc_free(m);
182 goto done;
185 memcpy(m_size + (uint8_t *)m2, r, r_size);
187 m2->count++;
189 done:
190 talloc_free(r);
191 return m2;
194 /* we've finished marshalling, return a data blob with the marshalled records */
195 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
197 TDB_DATA data;
198 data.dptr = (uint8_t *)m;
199 data.dsize = talloc_get_size(m);
200 return data;
204 loop over a marshalling buffer
206 - pass r==NULL to start
207 - loop the number of times indicated by m->count
209 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
210 uint32_t *reqid,
211 struct ctdb_ltdb_header *header,
212 TDB_DATA *key, TDB_DATA *data)
214 if (r == NULL) {
215 r = (struct ctdb_rec_data *)&m->data[0];
216 } else {
217 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
220 if (reqid != NULL) {
221 *reqid = r->reqid;
224 if (key != NULL) {
225 key->dptr = &r->data[0];
226 key->dsize = r->keylen;
228 if (data != NULL) {
229 data->dptr = &r->data[r->keylen];
230 data->dsize = r->datalen;
231 if (header != NULL) {
232 data->dptr += sizeof(*header);
233 data->dsize -= sizeof(*header);
237 if (header != NULL) {
238 if (r->datalen < sizeof(*header)) {
239 return NULL;
241 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
244 return r;
250 * CTDB transaction destructor
252 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
254 tdb_transaction_cancel(h->ctx->wtdb->tdb);
255 return 0;
259 * start a transaction on a ctdb database:
260 * - lock the transaction lock key
261 * - start the tdb transaction
263 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
265 struct db_record *rh;
266 TDB_DATA key;
267 TALLOC_CTX *tmp_ctx;
268 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
269 int ret;
270 struct db_ctdb_ctx *ctx = h->ctx;
271 TDB_DATA data;
273 key.dptr = (uint8_t *)discard_const(keyname);
274 key.dsize = strlen(keyname);
276 again:
277 tmp_ctx = talloc_new(h);
279 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
280 if (rh == NULL) {
281 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
282 talloc_free(tmp_ctx);
283 return -1;
285 talloc_free(rh);
287 ret = tdb_transaction_start(ctx->wtdb->tdb);
288 if (ret != 0) {
289 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
290 talloc_free(tmp_ctx);
291 return -1;
294 data = tdb_fetch(ctx->wtdb->tdb, key);
295 if ((data.dptr == NULL) ||
296 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
297 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
298 SAFE_FREE(data.dptr);
299 tdb_transaction_cancel(ctx->wtdb->tdb);
300 talloc_free(tmp_ctx);
301 goto again;
304 SAFE_FREE(data.dptr);
305 talloc_free(tmp_ctx);
307 return 0;
312 * CTDB dbwrap API: transaction_start function
313 * starts a transaction on a persistent database
315 static int db_ctdb_transaction_start(struct db_context *db)
317 struct db_ctdb_transaction_handle *h;
318 int ret;
319 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
320 struct db_ctdb_ctx);
322 if (!db->persistent) {
323 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
324 ctx->db_id));
325 return -1;
328 if (ctx->transaction) {
329 ctx->transaction->nesting++;
330 return 0;
333 h = talloc_zero(db, struct db_ctdb_transaction_handle);
334 if (h == NULL) {
335 DEBUG(0,(__location__ " oom for transaction handle\n"));
336 return -1;
339 h->ctx = ctx;
341 ret = db_ctdb_transaction_fetch_start(h);
342 if (ret != 0) {
343 talloc_free(h);
344 return -1;
347 talloc_set_destructor(h, db_ctdb_transaction_destructor);
349 ctx->transaction = h;
351 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
353 return 0;
359 fetch a record inside a transaction
361 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
362 TALLOC_CTX *mem_ctx,
363 TDB_DATA key, TDB_DATA *data)
365 struct db_ctdb_transaction_handle *h = db->transaction;
367 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
369 if (data->dptr != NULL) {
370 uint8_t *oldptr = (uint8_t *)data->dptr;
371 data->dsize -= sizeof(struct ctdb_ltdb_header);
372 if (data->dsize == 0) {
373 data->dptr = NULL;
374 } else {
375 data->dptr = (uint8 *)
376 talloc_memdup(
377 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
378 data->dsize);
380 SAFE_FREE(oldptr);
381 if (data->dptr == NULL && data->dsize != 0) {
382 return -1;
386 if (!h->in_replay) {
387 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
388 if (h->m_all == NULL) {
389 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
390 data->dsize = 0;
391 talloc_free(data->dptr);
392 return -1;
396 return 0;
400 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
401 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
403 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
404 TALLOC_CTX *mem_ctx,
405 TDB_DATA key)
407 struct db_record *result;
408 TDB_DATA ctdb_data;
410 if (!(result = talloc(mem_ctx, struct db_record))) {
411 DEBUG(0, ("talloc failed\n"));
412 return NULL;
415 result->private_data = ctx->transaction;
417 result->key.dsize = key.dsize;
418 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
419 if (result->key.dptr == NULL) {
420 DEBUG(0, ("talloc failed\n"));
421 TALLOC_FREE(result);
422 return NULL;
425 result->store = db_ctdb_store_transaction;
426 result->delete_rec = db_ctdb_delete_transaction;
428 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
429 if (ctdb_data.dptr == NULL) {
430 /* create the record */
431 result->value = tdb_null;
432 return result;
435 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
436 result->value.dptr = NULL;
438 if ((result->value.dsize != 0)
439 && !(result->value.dptr = (uint8 *)talloc_memdup(
440 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
441 result->value.dsize))) {
442 DEBUG(0, ("talloc failed\n"));
443 TALLOC_FREE(result);
446 SAFE_FREE(ctdb_data.dptr);
448 return result;
451 static int db_ctdb_record_destructor(struct db_record **recp)
453 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
454 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
455 rec->private_data, struct db_ctdb_transaction_handle);
456 int ret = h->ctx->db->transaction_commit(h->ctx->db);
457 if (ret != 0) {
458 DEBUG(0,(__location__ " transaction_commit failed\n"));
460 return 0;
464 auto-create a transaction for persistent databases
466 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
467 TALLOC_CTX *mem_ctx,
468 TDB_DATA key)
470 int res;
471 struct db_record *rec, **recp;
473 res = db_ctdb_transaction_start(ctx->db);
474 if (res == -1) {
475 return NULL;
478 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
479 if (rec == NULL) {
480 ctx->db->transaction_cancel(ctx->db);
481 return NULL;
484 /* destroy this transaction when we release the lock */
485 recp = talloc(rec, struct db_record *);
486 if (recp == NULL) {
487 ctx->db->transaction_cancel(ctx->db);
488 talloc_free(rec);
489 return NULL;
491 *recp = rec;
492 talloc_set_destructor(recp, db_ctdb_record_destructor);
493 return rec;
498 stores a record inside a transaction
500 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
501 TDB_DATA key, TDB_DATA data)
503 TALLOC_CTX *tmp_ctx = talloc_new(h);
504 int ret;
505 TDB_DATA rec;
506 struct ctdb_ltdb_header header;
507 NTSTATUS status;
509 /* we need the header so we can update the RSN */
510 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
511 if (rec.dptr == NULL) {
512 /* the record doesn't exist - create one with us as dmaster.
513 This is only safe because we are in a transaction and this
514 is a persistent database */
515 ZERO_STRUCT(header);
516 } else {
517 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
518 rec.dsize -= sizeof(struct ctdb_ltdb_header);
519 /* a special case, we are writing the same data that is there now */
520 if (data.dsize == rec.dsize &&
521 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
522 SAFE_FREE(rec.dptr);
523 talloc_free(tmp_ctx);
524 return 0;
526 SAFE_FREE(rec.dptr);
529 header.dmaster = get_my_vnn();
530 header.rsn++;
532 if (!h->in_replay) {
533 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
534 if (h->m_all == NULL) {
535 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
536 talloc_free(tmp_ctx);
537 return -1;
541 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
542 if (h->m_write == NULL) {
543 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
544 talloc_free(tmp_ctx);
545 return -1;
548 status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
549 if (NT_STATUS_IS_OK(status)) {
550 ret = 0;
551 } else {
552 ret = -1;
555 talloc_free(tmp_ctx);
557 return ret;
562 a record store inside a transaction
564 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
566 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
567 rec->private_data, struct db_ctdb_transaction_handle);
568 int ret;
570 ret = db_ctdb_transaction_store(h, rec->key, data);
571 if (ret != 0) {
572 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
574 return NT_STATUS_OK;
578 a record delete inside a transaction
580 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
582 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
583 rec->private_data, struct db_ctdb_transaction_handle);
584 int ret;
586 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
587 if (ret != 0) {
588 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
590 return NT_STATUS_OK;
595 replay a transaction
597 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
599 int ret, i;
600 struct ctdb_rec_data *rec = NULL;
602 h->in_replay = true;
603 talloc_free(h->m_write);
604 h->m_write = NULL;
606 ret = db_ctdb_transaction_fetch_start(h);
607 if (ret != 0) {
608 return ret;
611 for (i=0;i<h->m_all->count;i++) {
612 TDB_DATA key, data;
614 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
615 if (rec == NULL) {
616 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
617 goto failed;
620 if (rec->reqid == 0) {
621 /* its a store */
622 if (db_ctdb_transaction_store(h, key, data) != 0) {
623 goto failed;
625 } else {
626 TDB_DATA data2;
627 TALLOC_CTX *tmp_ctx = talloc_new(h);
629 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
630 talloc_free(tmp_ctx);
631 goto failed;
633 if (data2.dsize != data.dsize ||
634 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
635 /* the record has changed on us - we have to give up */
636 talloc_free(tmp_ctx);
637 goto failed;
639 talloc_free(tmp_ctx);
643 return 0;
645 failed:
646 tdb_transaction_cancel(h->ctx->wtdb->tdb);
647 return -1;
652 commit a transaction
654 static int db_ctdb_transaction_commit(struct db_context *db)
656 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
657 struct db_ctdb_ctx);
658 NTSTATUS rets;
659 int ret;
660 int status;
661 int retries = 0;
662 struct db_ctdb_transaction_handle *h = ctx->transaction;
663 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
665 if (h == NULL) {
666 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
667 return -1;
670 if (h->nested_cancel) {
671 db->transaction_cancel(db);
672 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
673 return -1;
676 if (h->nesting != 0) {
677 h->nesting--;
678 return 0;
681 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
683 talloc_set_destructor(h, NULL);
685 /* our commit strategy is quite complex.
687 - we first try to commit the changes to all other nodes
689 - if that works, then we commit locally and we are done
691 - if a commit on another node fails, then we need to cancel
692 the transaction, then restart the transaction (thus
693 opening a window of time for a pending recovery to
694 complete), then replay the transaction, checking all the
695 reads and writes (checking that reads give the same data,
696 and writes succeed). Then we retry the transaction to the
697 other nodes
700 again:
701 if (h->m_write == NULL) {
702 /* no changes were made, potentially after a retry */
703 tdb_transaction_cancel(h->ctx->wtdb->tdb);
704 talloc_free(h);
705 ctx->transaction = NULL;
706 return 0;
709 /* tell ctdbd to commit to the other nodes */
710 rets = ctdbd_control_local(messaging_ctdbd_connection(),
711 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
712 h->ctx->db_id, 0,
713 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
714 if (!NT_STATUS_IS_OK(rets) || status != 0) {
715 tdb_transaction_cancel(h->ctx->wtdb->tdb);
716 sleep(1);
718 if (!NT_STATUS_IS_OK(rets)) {
719 failure_control = CTDB_CONTROL_TRANS2_ERROR;
720 } else {
721 /* work out what error code we will give if we
722 have to fail the operation */
723 switch ((enum ctdb_trans2_commit_error)status) {
724 case CTDB_TRANS2_COMMIT_SUCCESS:
725 case CTDB_TRANS2_COMMIT_SOMEFAIL:
726 case CTDB_TRANS2_COMMIT_TIMEOUT:
727 failure_control = CTDB_CONTROL_TRANS2_ERROR;
728 break;
729 case CTDB_TRANS2_COMMIT_ALLFAIL:
730 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
731 break;
735 if (++retries == 5) {
736 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
737 h->ctx->db_id, retries, (unsigned)failure_control));
738 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
739 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
740 tdb_null, NULL, NULL, NULL);
741 h->ctx->transaction = NULL;
742 talloc_free(h);
743 ctx->transaction = NULL;
744 return -1;
747 if (ctdb_replay_transaction(h) != 0) {
748 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
749 (unsigned)failure_control));
750 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
751 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
752 tdb_null, NULL, NULL, NULL);
753 h->ctx->transaction = NULL;
754 talloc_free(h);
755 ctx->transaction = NULL;
756 return -1;
758 goto again;
759 } else {
760 failure_control = CTDB_CONTROL_TRANS2_ERROR;
763 /* do the real commit locally */
764 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
765 if (ret != 0) {
766 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
767 (unsigned)failure_control));
768 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
769 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
770 h->ctx->transaction = NULL;
771 talloc_free(h);
772 return ret;
775 /* tell ctdbd that we are finished with our local commit */
776 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
777 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
778 tdb_null, NULL, NULL, NULL);
779 h->ctx->transaction = NULL;
780 talloc_free(h);
781 return 0;
786 cancel a transaction
788 static int db_ctdb_transaction_cancel(struct db_context *db)
790 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
791 struct db_ctdb_ctx);
792 struct db_ctdb_transaction_handle *h = ctx->transaction;
794 if (h == NULL) {
795 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
796 return -1;
799 if (h->nesting != 0) {
800 h->nesting--;
801 h->nested_cancel = true;
802 return 0;
805 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
807 ctx->transaction = NULL;
808 talloc_free(h);
809 return 0;
813 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
815 struct db_ctdb_rec *crec = talloc_get_type_abort(
816 rec->private_data, struct db_ctdb_rec);
818 return db_ctdb_ltdb_store(crec->ctdb_ctx, rec->key, &(crec->header), data);
823 static NTSTATUS db_ctdb_delete(struct db_record *rec)
825 TDB_DATA data;
828 * We have to store the header with empty data. TODO: Fix the
829 * tdb-level cleanup
832 ZERO_STRUCT(data);
834 return db_ctdb_store(rec, data, 0);
838 static int db_ctdb_record_destr(struct db_record* data)
840 struct db_ctdb_rec *crec = talloc_get_type_abort(
841 data->private_data, struct db_ctdb_rec);
843 DEBUG(10, (DEBUGLEVEL > 10
844 ? "Unlocking db %u key %s\n"
845 : "Unlocking db %u key %.20s\n",
846 (int)crec->ctdb_ctx->db_id,
847 hex_encode_talloc(data, (unsigned char *)data->key.dptr,
848 data->key.dsize)));
850 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
851 DEBUG(0, ("tdb_chainunlock failed\n"));
852 return -1;
855 return 0;
858 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
859 TALLOC_CTX *mem_ctx,
860 TDB_DATA key,
861 bool persistent)
863 struct db_record *result;
864 struct db_ctdb_rec *crec;
865 NTSTATUS status;
866 TDB_DATA ctdb_data;
867 int migrate_attempts = 0;
869 if (!(result = talloc(mem_ctx, struct db_record))) {
870 DEBUG(0, ("talloc failed\n"));
871 return NULL;
874 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
875 DEBUG(0, ("talloc failed\n"));
876 TALLOC_FREE(result);
877 return NULL;
880 result->private_data = (void *)crec;
881 crec->ctdb_ctx = ctx;
883 result->key.dsize = key.dsize;
884 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
885 if (result->key.dptr == NULL) {
886 DEBUG(0, ("talloc failed\n"));
887 TALLOC_FREE(result);
888 return NULL;
892 * Do a blocking lock on the record
894 again:
896 if (DEBUGLEVEL >= 10) {
897 char *keystr = hex_encode_talloc(result, key.dptr, key.dsize);
898 DEBUG(10, (DEBUGLEVEL > 10
899 ? "Locking db %u key %s\n"
900 : "Locking db %u key %.20s\n",
901 (int)crec->ctdb_ctx->db_id, keystr));
902 TALLOC_FREE(keystr);
905 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
906 DEBUG(3, ("tdb_chainlock failed\n"));
907 TALLOC_FREE(result);
908 return NULL;
911 result->store = db_ctdb_store;
912 result->delete_rec = db_ctdb_delete;
913 talloc_set_destructor(result, db_ctdb_record_destr);
915 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
918 * See if we have a valid record and we are the dmaster. If so, we can
919 * take the shortcut and just return it.
922 if ((ctdb_data.dptr == NULL) ||
923 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
924 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
925 #if 0
926 || (random() % 2 != 0)
927 #endif
929 SAFE_FREE(ctdb_data.dptr);
930 tdb_chainunlock(ctx->wtdb->tdb, key);
931 talloc_set_destructor(result, NULL);
933 migrate_attempts += 1;
935 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
936 ctdb_data.dptr, ctdb_data.dptr ?
937 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
938 get_my_vnn()));
940 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
941 if (!NT_STATUS_IS_OK(status)) {
942 DEBUG(5, ("ctdb_migrate failed: %s\n",
943 nt_errstr(status)));
944 TALLOC_FREE(result);
945 return NULL;
947 /* now its migrated, try again */
948 goto again;
951 if (migrate_attempts > 10) {
952 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
953 migrate_attempts));
956 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
958 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
959 result->value.dptr = NULL;
961 if ((result->value.dsize != 0)
962 && !(result->value.dptr = (uint8 *)talloc_memdup(
963 result, ctdb_data.dptr + sizeof(crec->header),
964 result->value.dsize))) {
965 DEBUG(0, ("talloc failed\n"));
966 TALLOC_FREE(result);
969 SAFE_FREE(ctdb_data.dptr);
971 return result;
974 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
975 TALLOC_CTX *mem_ctx,
976 TDB_DATA key)
978 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
979 struct db_ctdb_ctx);
981 if (ctx->transaction != NULL) {
982 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
985 if (db->persistent) {
986 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
989 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
993 fetch (unlocked, no migration) operation on ctdb
995 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
996 TDB_DATA key, TDB_DATA *data)
998 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
999 struct db_ctdb_ctx);
1000 NTSTATUS status;
1001 TDB_DATA ctdb_data;
1003 if (ctx->transaction) {
1004 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
1007 /* try a direct fetch */
1008 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
1011 * See if we have a valid record and we are the dmaster. If so, we can
1012 * take the shortcut and just return it.
1013 * we bypass the dmaster check for persistent databases
1015 if ((ctdb_data.dptr != NULL) &&
1016 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
1017 (db->persistent ||
1018 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
1019 /* we are the dmaster - avoid the ctdb protocol op */
1021 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
1022 if (data->dsize == 0) {
1023 SAFE_FREE(ctdb_data.dptr);
1024 data->dptr = NULL;
1025 return 0;
1028 data->dptr = (uint8 *)talloc_memdup(
1029 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1030 data->dsize);
1032 SAFE_FREE(ctdb_data.dptr);
1034 if (data->dptr == NULL) {
1035 return -1;
1037 return 0;
1040 SAFE_FREE(ctdb_data.dptr);
1042 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1043 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1044 if (!NT_STATUS_IS_OK(status)) {
1045 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1046 return -1;
1049 return 0;
1052 struct traverse_state {
1053 struct db_context *db;
1054 int (*fn)(struct db_record *rec, void *private_data);
1055 void *private_data;
1058 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1060 struct traverse_state *state = (struct traverse_state *)private_data;
1061 struct db_record *rec;
1062 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1063 /* we have to give them a locked record to prevent races */
1064 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1065 if (rec && rec->value.dsize > 0) {
1066 state->fn(rec, state->private_data);
1068 talloc_free(tmp_ctx);
1071 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1072 void *private_data)
1074 struct traverse_state *state = (struct traverse_state *)private_data;
1075 struct db_record *rec;
1076 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1077 int ret = 0;
1078 /* we have to give them a locked record to prevent races */
1079 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1080 if (rec && rec->value.dsize > 0) {
1081 ret = state->fn(rec, state->private_data);
1083 talloc_free(tmp_ctx);
1084 return ret;
1087 static int db_ctdb_traverse(struct db_context *db,
1088 int (*fn)(struct db_record *rec,
1089 void *private_data),
1090 void *private_data)
1092 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1093 struct db_ctdb_ctx);
1094 struct traverse_state state;
1096 state.db = db;
1097 state.fn = fn;
1098 state.private_data = private_data;
1100 if (db->persistent) {
1101 /* for persistent databases we don't need to do a ctdb traverse,
1102 we can do a faster local traverse */
1103 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1107 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1108 return 0;
1111 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1113 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1116 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1118 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1121 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1123 struct traverse_state *state = (struct traverse_state *)private_data;
1124 struct db_record rec;
1125 rec.key = key;
1126 rec.value = data;
1127 rec.store = db_ctdb_store_deny;
1128 rec.delete_rec = db_ctdb_delete_deny;
1129 rec.private_data = state->db;
1130 state->fn(&rec, state->private_data);
1133 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1134 void *private_data)
1136 struct traverse_state *state = (struct traverse_state *)private_data;
1137 struct db_record rec;
1138 rec.key = kbuf;
1139 rec.value = dbuf;
1140 rec.store = db_ctdb_store_deny;
1141 rec.delete_rec = db_ctdb_delete_deny;
1142 rec.private_data = state->db;
1144 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1145 /* a deleted record */
1146 return 0;
1148 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1149 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1151 return state->fn(&rec, state->private_data);
1154 static int db_ctdb_traverse_read(struct db_context *db,
1155 int (*fn)(struct db_record *rec,
1156 void *private_data),
1157 void *private_data)
1159 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1160 struct db_ctdb_ctx);
1161 struct traverse_state state;
1163 state.db = db;
1164 state.fn = fn;
1165 state.private_data = private_data;
1167 if (db->persistent) {
1168 /* for persistent databases we don't need to do a ctdb traverse,
1169 we can do a faster local traverse */
1170 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1173 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1174 return 0;
1177 static int db_ctdb_get_seqnum(struct db_context *db)
1179 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1180 struct db_ctdb_ctx);
1181 return tdb_get_seqnum(ctx->wtdb->tdb);
1184 static int db_ctdb_get_flags(struct db_context *db)
1186 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1187 struct db_ctdb_ctx);
1188 return tdb_get_flags(ctx->wtdb->tdb);
1191 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1192 const char *name,
1193 int hash_size, int tdb_flags,
1194 int open_flags, mode_t mode)
1196 struct db_context *result;
1197 struct db_ctdb_ctx *db_ctdb;
1198 char *db_path;
1200 if (!lp_clustering()) {
1201 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1202 return NULL;
1205 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1206 DEBUG(0, ("talloc failed\n"));
1207 TALLOC_FREE(result);
1208 return NULL;
1211 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1212 DEBUG(0, ("talloc failed\n"));
1213 TALLOC_FREE(result);
1214 return NULL;
1217 db_ctdb->transaction = NULL;
1218 db_ctdb->db = result;
1220 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1221 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1222 TALLOC_FREE(result);
1223 return NULL;
1226 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1228 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1230 /* only pass through specific flags */
1231 tdb_flags &= TDB_SEQNUM;
1233 /* honor permissions if user has specified O_CREAT */
1234 if (open_flags & O_CREAT) {
1235 chmod(db_path, mode);
1238 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1239 if (db_ctdb->wtdb == NULL) {
1240 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1241 TALLOC_FREE(result);
1242 return NULL;
1244 talloc_free(db_path);
1246 result->private_data = (void *)db_ctdb;
1247 result->fetch_locked = db_ctdb_fetch_locked;
1248 result->fetch = db_ctdb_fetch;
1249 result->traverse = db_ctdb_traverse;
1250 result->traverse_read = db_ctdb_traverse_read;
1251 result->get_seqnum = db_ctdb_get_seqnum;
1252 result->get_flags = db_ctdb_get_flags;
1253 result->transaction_start = db_ctdb_transaction_start;
1254 result->transaction_commit = db_ctdb_transaction_commit;
1255 result->transaction_cancel = db_ctdb_transaction_cancel;
1257 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1258 name, db_ctdb->db_id));
1260 return result;
1262 #endif