s3-libnet-samsync: pass back sequence number from fetch_sam_entries_keytab.
[Samba.git] / source / lib / dbwrap_ctdb.c
blob1e3a97f06548fa38dbe4a017e47363b025569123
1 /*
2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
20 #include "includes.h"
21 #ifdef CLUSTER_SUPPORT
22 #include "ctdb.h"
23 #include "ctdb_private.h"
24 #include "ctdbd_conn.h"
26 struct db_ctdb_transaction_handle {
27 struct db_ctdb_ctx *ctx;
28 bool in_replay;
29 /* we store the reads and writes done under a transaction one
30 list stores both reads and writes, the other just writes
32 struct ctdb_marshall_buffer *m_all;
33 struct ctdb_marshall_buffer *m_write;
34 uint32_t nesting;
35 bool nested_cancel;
38 struct db_ctdb_ctx {
39 struct db_context *db;
40 struct tdb_wrap *wtdb;
41 uint32 db_id;
42 struct db_ctdb_transaction_handle *transaction;
45 struct db_ctdb_rec {
46 struct db_ctdb_ctx *ctdb_ctx;
47 struct ctdb_ltdb_header header;
50 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
51 TALLOC_CTX *mem_ctx,
52 TDB_DATA key,
53 bool persistent);
55 static NTSTATUS tdb_error_to_ntstatus(struct tdb_context *tdb)
57 NTSTATUS status;
58 enum TDB_ERROR tret = tdb_error(tdb);
60 switch (tret) {
61 case TDB_ERR_EXISTS:
62 status = NT_STATUS_OBJECT_NAME_COLLISION;
63 break;
64 case TDB_ERR_NOEXIST:
65 status = NT_STATUS_OBJECT_NAME_NOT_FOUND;
66 break;
67 default:
68 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
69 break;
72 return status;
78 form a ctdb_rec_data record from a key/data pair
80 note that header may be NULL. If not NULL then it is included in the data portion
81 of the record
83 static struct ctdb_rec_data *db_ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
84 TDB_DATA key,
85 struct ctdb_ltdb_header *header,
86 TDB_DATA data)
88 size_t length;
89 struct ctdb_rec_data *d;
91 length = offsetof(struct ctdb_rec_data, data) + key.dsize +
92 data.dsize + (header?sizeof(*header):0);
93 d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
94 if (d == NULL) {
95 return NULL;
97 d->length = length;
98 d->reqid = reqid;
99 d->keylen = key.dsize;
100 memcpy(&d->data[0], key.dptr, key.dsize);
101 if (header) {
102 d->datalen = data.dsize + sizeof(*header);
103 memcpy(&d->data[key.dsize], header, sizeof(*header));
104 memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
105 } else {
106 d->datalen = data.dsize;
107 memcpy(&d->data[key.dsize], data.dptr, data.dsize);
109 return d;
113 /* helper function for marshalling multiple records */
114 static struct ctdb_marshall_buffer *db_ctdb_marshall_add(TALLOC_CTX *mem_ctx,
115 struct ctdb_marshall_buffer *m,
116 uint64_t db_id,
117 uint32_t reqid,
118 TDB_DATA key,
119 struct ctdb_ltdb_header *header,
120 TDB_DATA data)
122 struct ctdb_rec_data *r;
123 size_t m_size, r_size;
124 struct ctdb_marshall_buffer *m2;
126 r = db_ctdb_marshall_record(mem_ctx, reqid, key, header, data);
127 if (r == NULL) {
128 talloc_free(m);
129 return NULL;
132 if (m == NULL) {
133 m = talloc_zero_size(mem_ctx, offsetof(struct ctdb_marshall_buffer, data));
134 if (m == NULL) {
135 return NULL;
137 m->db_id = db_id;
140 m_size = talloc_get_size(m);
141 r_size = talloc_get_size(r);
143 m2 = talloc_realloc_size(mem_ctx, m, m_size + r_size);
144 if (m2 == NULL) {
145 talloc_free(m);
146 return NULL;
149 memcpy(m_size + (uint8_t *)m2, r, r_size);
151 talloc_free(r);
153 m2->count++;
155 return m2;
158 /* we've finished marshalling, return a data blob with the marshalled records */
159 static TDB_DATA db_ctdb_marshall_finish(struct ctdb_marshall_buffer *m)
161 TDB_DATA data;
162 data.dptr = (uint8_t *)m;
163 data.dsize = talloc_get_size(m);
164 return data;
168 loop over a marshalling buffer
170 - pass r==NULL to start
171 - loop the number of times indicated by m->count
173 static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buffer *m, struct ctdb_rec_data *r,
174 uint32_t *reqid,
175 struct ctdb_ltdb_header *header,
176 TDB_DATA *key, TDB_DATA *data)
178 if (r == NULL) {
179 r = (struct ctdb_rec_data *)&m->data[0];
180 } else {
181 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
184 if (reqid != NULL) {
185 *reqid = r->reqid;
188 if (key != NULL) {
189 key->dptr = &r->data[0];
190 key->dsize = r->keylen;
192 if (data != NULL) {
193 data->dptr = &r->data[r->keylen];
194 data->dsize = r->datalen;
195 if (header != NULL) {
196 data->dptr += sizeof(*header);
197 data->dsize -= sizeof(*header);
201 if (header != NULL) {
202 if (r->datalen < sizeof(*header)) {
203 return NULL;
205 *header = *(struct ctdb_ltdb_header *)&r->data[r->keylen];
208 return r;
213 /* start a transaction on a database */
214 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
216 tdb_transaction_cancel(h->ctx->wtdb->tdb);
217 return 0;
220 /* start a transaction on a database */
221 static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
223 struct db_record *rh;
224 TDB_DATA key;
225 TALLOC_CTX *tmp_ctx;
226 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
227 int ret;
228 struct db_ctdb_ctx *ctx = h->ctx;
229 TDB_DATA data;
231 key.dptr = discard_const(keyname);
232 key.dsize = strlen(keyname);
234 again:
235 tmp_ctx = talloc_new(h);
237 rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
238 if (rh == NULL) {
239 DEBUG(0,(__location__ " Failed to fetch_lock database\n"));
240 talloc_free(tmp_ctx);
241 return -1;
243 talloc_free(rh);
245 ret = tdb_transaction_start(ctx->wtdb->tdb);
246 if (ret != 0) {
247 DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
248 talloc_free(tmp_ctx);
249 return -1;
252 data = tdb_fetch(ctx->wtdb->tdb, key);
253 if ((data.dptr == NULL) ||
254 (data.dsize < sizeof(struct ctdb_ltdb_header)) ||
255 ((struct ctdb_ltdb_header *)data.dptr)->dmaster != get_my_vnn()) {
256 SAFE_FREE(data.dptr);
257 tdb_transaction_cancel(ctx->wtdb->tdb);
258 talloc_free(tmp_ctx);
259 goto again;
262 SAFE_FREE(data.dptr);
263 talloc_free(tmp_ctx);
265 return 0;
269 /* start a transaction on a database */
270 static int db_ctdb_transaction_start(struct db_context *db)
272 struct db_ctdb_transaction_handle *h;
273 int ret;
274 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
275 struct db_ctdb_ctx);
277 if (!db->persistent) {
278 DEBUG(0,("transactions not supported on non-persistent database 0x%08x\n",
279 ctx->db_id));
280 return -1;
283 if (ctx->transaction) {
284 ctx->transaction->nesting++;
285 return 0;
288 h = talloc_zero(db, struct db_ctdb_transaction_handle);
289 if (h == NULL) {
290 DEBUG(0,(__location__ " oom for transaction handle\n"));
291 return -1;
294 h->ctx = ctx;
296 ret = db_ctdb_transaction_fetch_start(h);
297 if (ret != 0) {
298 talloc_free(h);
299 return -1;
302 talloc_set_destructor(h, db_ctdb_transaction_destructor);
304 ctx->transaction = h;
306 DEBUG(5,(__location__ " Started transaction on db 0x%08x\n", ctx->db_id));
308 return 0;
314 fetch a record inside a transaction
316 static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
317 TALLOC_CTX *mem_ctx,
318 TDB_DATA key, TDB_DATA *data)
320 struct db_ctdb_transaction_handle *h = db->transaction;
322 *data = tdb_fetch(h->ctx->wtdb->tdb, key);
324 if (data->dptr != NULL) {
325 uint8_t *oldptr = (uint8_t *)data->dptr;
326 data->dsize -= sizeof(struct ctdb_ltdb_header);
327 if (data->dsize == 0) {
328 data->dptr = NULL;
329 } else {
330 data->dptr = (uint8 *)
331 talloc_memdup(
332 mem_ctx, data->dptr+sizeof(struct ctdb_ltdb_header),
333 data->dsize);
335 SAFE_FREE(oldptr);
336 if (data->dptr == NULL && data->dsize != 0) {
337 return -1;
341 if (!h->in_replay) {
342 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
343 if (h->m_all == NULL) {
344 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
345 data->dsize = 0;
346 talloc_free(data->dptr);
347 return -1;
351 return 0;
355 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag);
356 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec);
358 static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ctx,
359 TALLOC_CTX *mem_ctx,
360 TDB_DATA key)
362 struct db_record *result;
363 TDB_DATA ctdb_data;
365 if (!(result = talloc(mem_ctx, struct db_record))) {
366 DEBUG(0, ("talloc failed\n"));
367 return NULL;
370 result->private_data = ctx->transaction;
372 result->key.dsize = key.dsize;
373 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
374 if (result->key.dptr == NULL) {
375 DEBUG(0, ("talloc failed\n"));
376 TALLOC_FREE(result);
377 return NULL;
380 result->store = db_ctdb_store_transaction;
381 result->delete_rec = db_ctdb_delete_transaction;
383 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
384 if (ctdb_data.dptr == NULL) {
385 /* create the record */
386 result->value = tdb_null;
387 return result;
390 result->value.dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
391 result->value.dptr = NULL;
393 if ((result->value.dsize != 0)
394 && !(result->value.dptr = (uint8 *)talloc_memdup(
395 result, ctdb_data.dptr + sizeof(struct ctdb_ltdb_header),
396 result->value.dsize))) {
397 DEBUG(0, ("talloc failed\n"));
398 TALLOC_FREE(result);
401 SAFE_FREE(ctdb_data.dptr);
403 return result;
406 static int db_ctdb_record_destructor(struct db_record **recp)
408 struct db_record *rec = talloc_get_type_abort(*recp, struct db_record);
409 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
410 rec->private_data, struct db_ctdb_transaction_handle);
411 int ret = h->ctx->db->transaction_commit(h->ctx->db);
412 if (ret != 0) {
413 DEBUG(0,(__location__ " transaction_commit failed\n"));
415 return 0;
419 auto-create a transaction for persistent databases
421 static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
422 TALLOC_CTX *mem_ctx,
423 TDB_DATA key)
425 int res;
426 struct db_record *rec, **recp;
428 res = db_ctdb_transaction_start(ctx->db);
429 if (res == -1) {
430 return NULL;
433 rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
434 if (rec == NULL) {
435 ctx->db->transaction_cancel(ctx->db);
436 return NULL;
439 /* destroy this transaction when we release the lock */
440 recp = talloc(rec, struct db_record *);
441 if (recp == NULL) {
442 ctx->db->transaction_cancel(ctx->db);
443 talloc_free(rec);
444 return NULL;
446 *recp = rec;
447 talloc_set_destructor(recp, db_ctdb_record_destructor);
448 return rec;
453 stores a record inside a transaction
455 static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
456 TDB_DATA key, TDB_DATA data)
458 TALLOC_CTX *tmp_ctx = talloc_new(h);
459 int ret;
460 TDB_DATA rec;
461 struct ctdb_ltdb_header header;
463 /* we need the header so we can update the RSN */
464 rec = tdb_fetch(h->ctx->wtdb->tdb, key);
465 if (rec.dptr == NULL) {
466 /* the record doesn't exist - create one with us as dmaster.
467 This is only safe because we are in a transaction and this
468 is a persistent database */
469 ZERO_STRUCT(header);
470 header.dmaster = get_my_vnn();
471 } else {
472 memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
473 rec.dsize -= sizeof(struct ctdb_ltdb_header);
474 /* a special case, we are writing the same data that is there now */
475 if (data.dsize == rec.dsize &&
476 memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
477 SAFE_FREE(rec.dptr);
478 talloc_free(tmp_ctx);
479 return 0;
481 SAFE_FREE(rec.dptr);
484 header.rsn++;
486 if (!h->in_replay) {
487 h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
488 if (h->m_all == NULL) {
489 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
490 talloc_free(tmp_ctx);
491 return -1;
495 h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
496 if (h->m_write == NULL) {
497 DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
498 talloc_free(tmp_ctx);
499 return -1;
502 rec.dsize = data.dsize + sizeof(struct ctdb_ltdb_header);
503 rec.dptr = talloc_size(tmp_ctx, rec.dsize);
504 if (rec.dptr == NULL) {
505 DEBUG(0,(__location__ " Failed to alloc record\n"));
506 talloc_free(tmp_ctx);
507 return -1;
509 memcpy(rec.dptr, &header, sizeof(struct ctdb_ltdb_header));
510 memcpy(sizeof(struct ctdb_ltdb_header) + (uint8_t *)rec.dptr, data.dptr, data.dsize);
512 ret = tdb_store(h->ctx->wtdb->tdb, key, rec, TDB_REPLACE);
514 talloc_free(tmp_ctx);
516 return ret;
521 a record store inside a transaction
523 static NTSTATUS db_ctdb_store_transaction(struct db_record *rec, TDB_DATA data, int flag)
525 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
526 rec->private_data, struct db_ctdb_transaction_handle);
527 int ret;
529 ret = db_ctdb_transaction_store(h, rec->key, data);
530 if (ret != 0) {
531 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
533 return NT_STATUS_OK;
537 a record delete inside a transaction
539 static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
541 struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
542 rec->private_data, struct db_ctdb_transaction_handle);
543 int ret;
545 ret = db_ctdb_transaction_store(h, rec->key, tdb_null);
546 if (ret != 0) {
547 return tdb_error_to_ntstatus(h->ctx->wtdb->tdb);
549 return NT_STATUS_OK;
554 replay a transaction
556 static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
558 int ret, i;
559 struct ctdb_rec_data *rec = NULL;
561 h->in_replay = true;
562 talloc_free(h->m_write);
563 h->m_write = NULL;
565 ret = db_ctdb_transaction_fetch_start(h);
566 if (ret != 0) {
567 return ret;
570 for (i=0;i<h->m_all->count;i++) {
571 TDB_DATA key, data;
573 rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
574 if (rec == NULL) {
575 DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
576 goto failed;
579 if (rec->reqid == 0) {
580 /* its a store */
581 if (db_ctdb_transaction_store(h, key, data) != 0) {
582 goto failed;
584 } else {
585 TDB_DATA data2;
586 TALLOC_CTX *tmp_ctx = talloc_new(h);
588 if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
589 talloc_free(tmp_ctx);
590 goto failed;
592 if (data2.dsize != data.dsize ||
593 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
594 /* the record has changed on us - we have to give up */
595 talloc_free(tmp_ctx);
596 goto failed;
598 talloc_free(tmp_ctx);
602 return 0;
604 failed:
605 tdb_transaction_cancel(h->ctx->wtdb->tdb);
606 return -1;
611 commit a transaction
613 static int db_ctdb_transaction_commit(struct db_context *db)
615 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
616 struct db_ctdb_ctx);
617 NTSTATUS rets;
618 int ret;
619 int status;
620 int retries = 0;
621 struct db_ctdb_transaction_handle *h = ctx->transaction;
622 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
624 if (h == NULL) {
625 DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
626 return -1;
629 if (h->nested_cancel) {
630 db->transaction_cancel(db);
631 DEBUG(5,(__location__ " Failed transaction commit after nested cancel\n"));
632 return -1;
635 if (h->nesting != 0) {
636 h->nesting--;
637 return 0;
640 DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
642 talloc_set_destructor(h, NULL);
644 /* our commit strategy is quite complex.
646 - we first try to commit the changes to all other nodes
648 - if that works, then we commit locally and we are done
650 - if a commit on another node fails, then we need to cancel
651 the transaction, then restart the transaction (thus
652 opening a window of time for a pending recovery to
653 complete), then replay the transaction, checking all the
654 reads and writes (checking that reads give the same data,
655 and writes succeed). Then we retry the transaction to the
656 other nodes
659 again:
660 if (h->m_write == NULL) {
661 /* no changes were made, potentially after a retry */
662 tdb_transaction_cancel(h->ctx->wtdb->tdb);
663 talloc_free(h);
664 ctx->transaction = NULL;
665 return 0;
668 /* tell ctdbd to commit to the other nodes */
669 rets = ctdbd_control_local(messaging_ctdbd_connection(),
670 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY,
671 h->ctx->db_id, 0,
672 db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
673 if (!NT_STATUS_IS_OK(rets) || status != 0) {
674 tdb_transaction_cancel(h->ctx->wtdb->tdb);
675 sleep(1);
677 if (!NT_STATUS_IS_OK(rets)) {
678 failure_control = CTDB_CONTROL_TRANS2_ERROR;
679 } else {
680 /* work out what error code we will give if we
681 have to fail the operation */
682 switch ((enum ctdb_trans2_commit_error)status) {
683 case CTDB_TRANS2_COMMIT_SUCCESS:
684 case CTDB_TRANS2_COMMIT_SOMEFAIL:
685 case CTDB_TRANS2_COMMIT_TIMEOUT:
686 failure_control = CTDB_CONTROL_TRANS2_ERROR;
687 break;
688 case CTDB_TRANS2_COMMIT_ALLFAIL:
689 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
690 break;
694 if (++retries == 5) {
695 DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
696 h->ctx->db_id, retries, (unsigned)failure_control));
697 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
698 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
699 tdb_null, NULL, NULL, NULL);
700 h->ctx->transaction = NULL;
701 talloc_free(h);
702 ctx->transaction = NULL;
703 return -1;
706 if (ctdb_replay_transaction(h) != 0) {
707 DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
708 (unsigned)failure_control));
709 ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
710 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
711 tdb_null, NULL, NULL, NULL);
712 h->ctx->transaction = NULL;
713 talloc_free(h);
714 ctx->transaction = NULL;
715 return -1;
717 goto again;
718 } else {
719 failure_control = CTDB_CONTROL_TRANS2_ERROR;
722 /* do the real commit locally */
723 ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
724 if (ret != 0) {
725 DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
726 (unsigned)failure_control));
727 ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id,
728 CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
729 h->ctx->transaction = NULL;
730 talloc_free(h);
731 return ret;
734 /* tell ctdbd that we are finished with our local commit */
735 ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED,
736 h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY,
737 tdb_null, NULL, NULL, NULL);
738 h->ctx->transaction = NULL;
739 talloc_free(h);
740 return 0;
745 cancel a transaction
747 static int db_ctdb_transaction_cancel(struct db_context *db)
749 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
750 struct db_ctdb_ctx);
751 struct db_ctdb_transaction_handle *h = ctx->transaction;
753 if (h == NULL) {
754 DEBUG(0,(__location__ " transaction cancel with no open transaction on db 0x%08x\n", ctx->db_id));
755 return -1;
758 if (h->nesting != 0) {
759 h->nesting--;
760 h->nested_cancel = true;
761 return 0;
764 DEBUG(5,(__location__ " Cancel transaction on db 0x%08x\n", ctx->db_id));
766 ctx->transaction = NULL;
767 talloc_free(h);
768 return 0;
772 static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
774 struct db_ctdb_rec *crec = talloc_get_type_abort(
775 rec->private_data, struct db_ctdb_rec);
776 TDB_DATA cdata;
777 int ret;
779 cdata.dsize = sizeof(crec->header) + data.dsize;
781 if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
782 return NT_STATUS_NO_MEMORY;
785 memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
786 memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
788 ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key, cdata, TDB_REPLACE);
790 SAFE_FREE(cdata.dptr);
792 return (ret == 0) ? NT_STATUS_OK
793 : tdb_error_to_ntstatus(crec->ctdb_ctx->wtdb->tdb);
798 static NTSTATUS db_ctdb_delete(struct db_record *rec)
800 TDB_DATA data;
803 * We have to store the header with empty data. TODO: Fix the
804 * tdb-level cleanup
807 ZERO_STRUCT(data);
809 return db_ctdb_store(rec, data, 0);
813 static int db_ctdb_record_destr(struct db_record* data)
815 struct db_ctdb_rec *crec = talloc_get_type_abort(
816 data->private_data, struct db_ctdb_rec);
818 DEBUG(10, (DEBUGLEVEL > 10
819 ? "Unlocking db %u key %s\n"
820 : "Unlocking db %u key %.20s\n",
821 (int)crec->ctdb_ctx->db_id,
822 hex_encode(data, (unsigned char *)data->key.dptr,
823 data->key.dsize)));
825 if (tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, data->key) != 0) {
826 DEBUG(0, ("tdb_chainunlock failed\n"));
827 return -1;
830 return 0;
833 static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
834 TALLOC_CTX *mem_ctx,
835 TDB_DATA key,
836 bool persistent)
838 struct db_record *result;
839 struct db_ctdb_rec *crec;
840 NTSTATUS status;
841 TDB_DATA ctdb_data;
842 int migrate_attempts = 0;
844 if (!(result = talloc(mem_ctx, struct db_record))) {
845 DEBUG(0, ("talloc failed\n"));
846 return NULL;
849 if (!(crec = TALLOC_ZERO_P(result, struct db_ctdb_rec))) {
850 DEBUG(0, ("talloc failed\n"));
851 TALLOC_FREE(result);
852 return NULL;
855 result->private_data = (void *)crec;
856 crec->ctdb_ctx = ctx;
858 result->key.dsize = key.dsize;
859 result->key.dptr = (uint8 *)talloc_memdup(result, key.dptr, key.dsize);
860 if (result->key.dptr == NULL) {
861 DEBUG(0, ("talloc failed\n"));
862 TALLOC_FREE(result);
863 return NULL;
867 * Do a blocking lock on the record
869 again:
871 if (DEBUGLEVEL >= 10) {
872 char *keystr = hex_encode(result, key.dptr, key.dsize);
873 DEBUG(10, (DEBUGLEVEL > 10
874 ? "Locking db %u key %s\n"
875 : "Locking db %u key %.20s\n",
876 (int)crec->ctdb_ctx->db_id, keystr));
877 TALLOC_FREE(keystr);
880 if (tdb_chainlock(ctx->wtdb->tdb, key) != 0) {
881 DEBUG(3, ("tdb_chainlock failed\n"));
882 TALLOC_FREE(result);
883 return NULL;
886 result->store = db_ctdb_store;
887 result->delete_rec = db_ctdb_delete;
888 talloc_set_destructor(result, db_ctdb_record_destr);
890 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
893 * See if we have a valid record and we are the dmaster. If so, we can
894 * take the shortcut and just return it.
897 if ((ctdb_data.dptr == NULL) ||
898 (ctdb_data.dsize < sizeof(struct ctdb_ltdb_header)) ||
899 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster != get_my_vnn()
900 #if 0
901 || (random() % 2 != 0)
902 #endif
904 SAFE_FREE(ctdb_data.dptr);
905 tdb_chainunlock(ctx->wtdb->tdb, key);
906 talloc_set_destructor(result, NULL);
908 migrate_attempts += 1;
910 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
911 ctdb_data.dptr, ctdb_data.dptr ?
912 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster : -1,
913 get_my_vnn()));
915 status = ctdbd_migrate(messaging_ctdbd_connection(),ctx->db_id, key);
916 if (!NT_STATUS_IS_OK(status)) {
917 DEBUG(5, ("ctdb_migrate failed: %s\n",
918 nt_errstr(status)));
919 TALLOC_FREE(result);
920 return NULL;
922 /* now its migrated, try again */
923 goto again;
926 if (migrate_attempts > 10) {
927 DEBUG(0, ("db_ctdb_fetch_locked needed %d attempts\n",
928 migrate_attempts));
931 memcpy(&crec->header, ctdb_data.dptr, sizeof(crec->header));
933 result->value.dsize = ctdb_data.dsize - sizeof(crec->header);
934 result->value.dptr = NULL;
936 if ((result->value.dsize != 0)
937 && !(result->value.dptr = (uint8 *)talloc_memdup(
938 result, ctdb_data.dptr + sizeof(crec->header),
939 result->value.dsize))) {
940 DEBUG(0, ("talloc failed\n"));
941 TALLOC_FREE(result);
944 SAFE_FREE(ctdb_data.dptr);
946 return result;
949 static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
950 TALLOC_CTX *mem_ctx,
951 TDB_DATA key)
953 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
954 struct db_ctdb_ctx);
956 if (ctx->transaction != NULL) {
957 return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
960 if (db->persistent) {
961 return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
964 return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
968 fetch (unlocked, no migration) operation on ctdb
970 static int db_ctdb_fetch(struct db_context *db, TALLOC_CTX *mem_ctx,
971 TDB_DATA key, TDB_DATA *data)
973 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
974 struct db_ctdb_ctx);
975 NTSTATUS status;
976 TDB_DATA ctdb_data;
978 if (ctx->transaction) {
979 return db_ctdb_transaction_fetch(ctx, mem_ctx, key, data);
982 /* try a direct fetch */
983 ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
986 * See if we have a valid record and we are the dmaster. If so, we can
987 * take the shortcut and just return it.
988 * we bypass the dmaster check for persistent databases
990 if ((ctdb_data.dptr != NULL) &&
991 (ctdb_data.dsize >= sizeof(struct ctdb_ltdb_header)) &&
992 (db->persistent ||
993 ((struct ctdb_ltdb_header *)ctdb_data.dptr)->dmaster == get_my_vnn())) {
994 /* we are the dmaster - avoid the ctdb protocol op */
996 data->dsize = ctdb_data.dsize - sizeof(struct ctdb_ltdb_header);
997 if (data->dsize == 0) {
998 SAFE_FREE(ctdb_data.dptr);
999 data->dptr = NULL;
1000 return 0;
1003 data->dptr = (uint8 *)talloc_memdup(
1004 mem_ctx, ctdb_data.dptr+sizeof(struct ctdb_ltdb_header),
1005 data->dsize);
1007 SAFE_FREE(ctdb_data.dptr);
1009 if (data->dptr == NULL) {
1010 return -1;
1012 return 0;
1015 SAFE_FREE(ctdb_data.dptr);
1017 /* we weren't able to get it locally - ask ctdb to fetch it for us */
1018 status = ctdbd_fetch(messaging_ctdbd_connection(),ctx->db_id, key, mem_ctx, data);
1019 if (!NT_STATUS_IS_OK(status)) {
1020 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status)));
1021 return -1;
1024 return 0;
1027 struct traverse_state {
1028 struct db_context *db;
1029 int (*fn)(struct db_record *rec, void *private_data);
1030 void *private_data;
1033 static void traverse_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1035 struct traverse_state *state = (struct traverse_state *)private_data;
1036 struct db_record *rec;
1037 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1038 /* we have to give them a locked record to prevent races */
1039 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, key);
1040 if (rec && rec->value.dsize > 0) {
1041 state->fn(rec, state->private_data);
1043 talloc_free(tmp_ctx);
1046 static int traverse_persistent_callback(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1047 void *private_data)
1049 struct traverse_state *state = (struct traverse_state *)private_data;
1050 struct db_record *rec;
1051 TALLOC_CTX *tmp_ctx = talloc_new(state->db);
1052 int ret = 0;
1053 /* we have to give them a locked record to prevent races */
1054 rec = db_ctdb_fetch_locked(state->db, tmp_ctx, kbuf);
1055 if (rec && rec->value.dsize > 0) {
1056 ret = state->fn(rec, state->private_data);
1058 talloc_free(tmp_ctx);
1059 return ret;
1062 static int db_ctdb_traverse(struct db_context *db,
1063 int (*fn)(struct db_record *rec,
1064 void *private_data),
1065 void *private_data)
1067 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1068 struct db_ctdb_ctx);
1069 struct traverse_state state;
1071 state.db = db;
1072 state.fn = fn;
1073 state.private_data = private_data;
1075 if (db->persistent) {
1076 /* for persistent databases we don't need to do a ctdb traverse,
1077 we can do a faster local traverse */
1078 return tdb_traverse(ctx->wtdb->tdb, traverse_persistent_callback, &state);
1082 ctdbd_traverse(ctx->db_id, traverse_callback, &state);
1083 return 0;
1086 static NTSTATUS db_ctdb_store_deny(struct db_record *rec, TDB_DATA data, int flag)
1088 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1091 static NTSTATUS db_ctdb_delete_deny(struct db_record *rec)
1093 return NT_STATUS_MEDIA_WRITE_PROTECTED;
1096 static void traverse_read_callback(TDB_DATA key, TDB_DATA data, void *private_data)
1098 struct traverse_state *state = (struct traverse_state *)private_data;
1099 struct db_record rec;
1100 rec.key = key;
1101 rec.value = data;
1102 rec.store = db_ctdb_store_deny;
1103 rec.delete_rec = db_ctdb_delete_deny;
1104 rec.private_data = state->db;
1105 state->fn(&rec, state->private_data);
1108 static int traverse_persistent_callback_read(TDB_CONTEXT *tdb, TDB_DATA kbuf, TDB_DATA dbuf,
1109 void *private_data)
1111 struct traverse_state *state = (struct traverse_state *)private_data;
1112 struct db_record rec;
1113 rec.key = kbuf;
1114 rec.value = dbuf;
1115 rec.store = db_ctdb_store_deny;
1116 rec.delete_rec = db_ctdb_delete_deny;
1117 rec.private_data = state->db;
1119 if (rec.value.dsize <= sizeof(struct ctdb_ltdb_header)) {
1120 /* a deleted record */
1121 return 0;
1123 rec.value.dsize -= sizeof(struct ctdb_ltdb_header);
1124 rec.value.dptr += sizeof(struct ctdb_ltdb_header);
1126 return state->fn(&rec, state->private_data);
1129 static int db_ctdb_traverse_read(struct db_context *db,
1130 int (*fn)(struct db_record *rec,
1131 void *private_data),
1132 void *private_data)
1134 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1135 struct db_ctdb_ctx);
1136 struct traverse_state state;
1138 state.db = db;
1139 state.fn = fn;
1140 state.private_data = private_data;
1142 if (db->persistent) {
1143 /* for persistent databases we don't need to do a ctdb traverse,
1144 we can do a faster local traverse */
1145 return tdb_traverse_read(ctx->wtdb->tdb, traverse_persistent_callback_read, &state);
1148 ctdbd_traverse(ctx->db_id, traverse_read_callback, &state);
1149 return 0;
1152 static int db_ctdb_get_seqnum(struct db_context *db)
1154 struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
1155 struct db_ctdb_ctx);
1156 return tdb_get_seqnum(ctx->wtdb->tdb);
1159 struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
1160 const char *name,
1161 int hash_size, int tdb_flags,
1162 int open_flags, mode_t mode)
1164 struct db_context *result;
1165 struct db_ctdb_ctx *db_ctdb;
1166 char *db_path;
1168 if (!lp_clustering()) {
1169 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
1170 return NULL;
1173 if (!(result = TALLOC_ZERO_P(mem_ctx, struct db_context))) {
1174 DEBUG(0, ("talloc failed\n"));
1175 TALLOC_FREE(result);
1176 return NULL;
1179 if (!(db_ctdb = TALLOC_P(result, struct db_ctdb_ctx))) {
1180 DEBUG(0, ("talloc failed\n"));
1181 TALLOC_FREE(result);
1182 return NULL;
1185 db_ctdb->transaction = NULL;
1186 db_ctdb->db = result;
1188 if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
1189 DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
1190 TALLOC_FREE(result);
1191 return NULL;
1194 db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
1196 result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
1198 /* only pass through specific flags */
1199 tdb_flags &= TDB_SEQNUM;
1201 /* honor permissions if user has specified O_CREAT */
1202 if (open_flags & O_CREAT) {
1203 chmod(db_path, mode);
1206 db_ctdb->wtdb = tdb_wrap_open(db_ctdb, db_path, hash_size, tdb_flags, O_RDWR, 0);
1207 if (db_ctdb->wtdb == NULL) {
1208 DEBUG(0, ("Could not open tdb %s: %s\n", db_path, strerror(errno)));
1209 TALLOC_FREE(result);
1210 return NULL;
1212 talloc_free(db_path);
1214 result->private_data = (void *)db_ctdb;
1215 result->fetch_locked = db_ctdb_fetch_locked;
1216 result->fetch = db_ctdb_fetch;
1217 result->traverse = db_ctdb_traverse;
1218 result->traverse_read = db_ctdb_traverse_read;
1219 result->get_seqnum = db_ctdb_get_seqnum;
1220 result->transaction_start = db_ctdb_transaction_start;
1221 result->transaction_commit = db_ctdb_transaction_commit;
1222 result->transaction_cancel = db_ctdb_transaction_cancel;
1224 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
1225 name, db_ctdb->db_id));
1227 return result;
1229 #endif