auth/gensec: add some useful debugging to gensec_update_send/gensec_update_done
[Samba.git] / ctdb / common / ctdb_ltdb.c
blobf4f216e1ee365d30962891e681985164e3570347
1 /*
2 ctdb ltdb code
4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Ronnie sahlberg 2011
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "replace.h"
22 #include "system/network.h"
23 #include "system/filesys.h"
25 #include <tdb.h>
27 #include "lib/tdb_wrap/tdb_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/util/debug.h"
31 #include "ctdb_private.h"
33 #include "common/common.h"
34 #include "common/logging.h"
38 * Calculate tdb flags based on databse type
40 int ctdb_db_tdb_flags(uint8_t db_flags, bool with_valgrind, bool with_mutex)
42 int tdb_flags = 0;
44 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
45 tdb_flags = TDB_DEFAULT;
47 } else if (db_flags & CTDB_DB_FLAGS_REPLICATED) {
48 tdb_flags = TDB_NOSYNC |
49 TDB_CLEAR_IF_FIRST |
50 TDB_INCOMPATIBLE_HASH;
52 } else {
53 tdb_flags = TDB_NOSYNC |
54 TDB_CLEAR_IF_FIRST |
55 TDB_INCOMPATIBLE_HASH;
57 #ifdef TDB_MUTEX_LOCKING
58 if (with_mutex && tdb_runtime_check_for_robust_mutexes()) {
59 tdb_flags |= TDB_MUTEX_LOCKING;
61 #endif
65 tdb_flags |= TDB_DISALLOW_NESTING;
66 if (with_valgrind) {
67 tdb_flags |= TDB_NOMMAP;
70 return tdb_flags;
74 find an attached ctdb_db handle given a name
76 struct ctdb_db_context *ctdb_db_handle(struct ctdb_context *ctdb, const char *name)
78 struct ctdb_db_context *tmp_db;
79 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
80 if (strcmp(name, tmp_db->db_name) == 0) {
81 return tmp_db;
84 return NULL;
87 bool ctdb_db_persistent(struct ctdb_db_context *ctdb_db)
89 if (ctdb_db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
90 return true;
92 return false;
95 bool ctdb_db_replicated(struct ctdb_db_context *ctdb_db)
97 if (ctdb_db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
98 return true;
100 return false;
103 bool ctdb_db_volatile(struct ctdb_db_context *ctdb_db)
105 if ((ctdb_db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
106 (ctdb_db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
107 return false;
109 return true;
112 bool ctdb_db_readonly(struct ctdb_db_context *ctdb_db)
114 if (ctdb_db->db_flags & CTDB_DB_FLAGS_READONLY) {
115 return true;
117 return false;
120 void ctdb_db_set_readonly(struct ctdb_db_context *ctdb_db)
122 ctdb_db->db_flags |= CTDB_DB_FLAGS_READONLY;
125 void ctdb_db_reset_readonly(struct ctdb_db_context *ctdb_db)
127 ctdb_db->db_flags &= ~CTDB_DB_FLAGS_READONLY;
130 bool ctdb_db_sticky(struct ctdb_db_context *ctdb_db)
132 if (ctdb_db->db_flags & CTDB_DB_FLAGS_STICKY) {
133 return true;
135 return false;
138 void ctdb_db_set_sticky(struct ctdb_db_context *ctdb_db)
140 ctdb_db->db_flags |= CTDB_DB_FLAGS_STICKY;
144 return the lmaster given a key
146 uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
148 uint32_t idx, lmaster;
150 idx = ctdb_hash(key) % ctdb->vnn_map->size;
151 lmaster = ctdb->vnn_map->map[idx];
153 return lmaster;
158 construct an initial header for a record with no ltdb header yet
160 static void ltdb_initial_header(struct ctdb_db_context *ctdb_db,
161 TDB_DATA key,
162 struct ctdb_ltdb_header *header)
164 ZERO_STRUCTP(header);
165 /* initial dmaster is the lmaster */
166 header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
167 header->flags = CTDB_REC_FLAG_AUTOMATIC;
172 fetch a record from the ltdb, separating out the header information
173 and returning the body of the record. A valid (initial) header is
174 returned if the record is not present
176 int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
177 TDB_DATA key, struct ctdb_ltdb_header *header,
178 TALLOC_CTX *mem_ctx, TDB_DATA *data)
180 TDB_DATA rec;
181 struct ctdb_context *ctdb = ctdb_db->ctdb;
183 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
184 if (rec.dsize < sizeof(*header)) {
185 /* return an initial header */
186 if (rec.dptr) free(rec.dptr);
187 if (ctdb->vnn_map == NULL) {
188 /* called from the client */
189 ZERO_STRUCTP(data);
190 header->dmaster = (uint32_t)-1;
191 return -1;
193 ltdb_initial_header(ctdb_db, key, header);
194 if (data) {
195 *data = tdb_null;
197 if (ctdb_db_persistent(ctdb_db) ||
198 header->dmaster == ctdb_db->ctdb->pnn) {
199 if (ctdb_ltdb_store(ctdb_db, key, header, tdb_null) != 0) {
200 DEBUG(DEBUG_NOTICE,
201 (__location__ "failed to store initial header\n"));
204 return 0;
207 *header = *(struct ctdb_ltdb_header *)rec.dptr;
209 if (data) {
210 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
211 data->dptr = talloc_memdup(mem_ctx,
212 sizeof(struct ctdb_ltdb_header)+rec.dptr,
213 data->dsize);
216 free(rec.dptr);
217 if (data) {
218 CTDB_NO_MEMORY(ctdb, data->dptr);
221 return 0;
225 fetch a record from the ltdb, separating out the header information
226 and returning the body of the record.
227 if the record does not exist, *header will be NULL
228 and data = {0, NULL}
230 int ctdb_ltdb_fetch_with_header(struct ctdb_db_context *ctdb_db,
231 TDB_DATA key, struct ctdb_ltdb_header *header,
232 TALLOC_CTX *mem_ctx, TDB_DATA *data)
234 TDB_DATA rec;
236 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
237 if (rec.dsize < sizeof(*header)) {
238 free(rec.dptr);
240 data->dsize = 0;
241 data->dptr = NULL;
242 return -1;
245 *header = *(struct ctdb_ltdb_header *)rec.dptr;
246 if (data) {
247 data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
248 data->dptr = talloc_memdup(mem_ctx,
249 sizeof(struct ctdb_ltdb_header)+rec.dptr,
250 data->dsize);
253 free(rec.dptr);
255 return 0;
260 write a record to a normal database
262 int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
263 struct ctdb_ltdb_header *header, TDB_DATA data)
265 struct ctdb_context *ctdb = ctdb_db->ctdb;
266 TDB_DATA rec[2];
267 uint32_t hsize = sizeof(struct ctdb_ltdb_header);
268 int ret;
269 bool seqnum_suppressed = false;
271 if (ctdb_db->ctdb_ltdb_store_fn) {
272 return ctdb_db->ctdb_ltdb_store_fn(ctdb_db, key, header, data);
275 if (ctdb->flags & CTDB_FLAG_TORTURE) {
276 TDB_DATA old;
277 struct ctdb_ltdb_header *h2;
279 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
280 h2 = (struct ctdb_ltdb_header *)old.dptr;
281 if (old.dptr != NULL && old.dsize >= hsize &&
282 h2->rsn > header->rsn) {
283 DEBUG(DEBUG_ERR,
284 ("RSN regression! %"PRIu64" %"PRIu64"\n",
285 h2->rsn, header->rsn));
287 if (old.dptr != NULL) {
288 free(old.dptr);
292 rec[0].dsize = hsize;
293 rec[0].dptr = (uint8_t *)header;
295 rec[1].dsize = data.dsize;
296 rec[1].dptr = data.dptr;
298 /* Databases with seqnum updates enabled only get their seqnum
299 changes when/if we modify the data */
300 if (ctdb_db->seqnum_update != NULL) {
301 TDB_DATA old;
302 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
304 if ((old.dsize == hsize + data.dsize) &&
305 memcmp(old.dptr+hsize, data.dptr, data.dsize) == 0) {
306 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
307 seqnum_suppressed = true;
309 if (old.dptr != NULL) {
310 free(old.dptr);
313 ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
314 if (ret != 0) {
315 DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n"));
317 if (seqnum_suppressed) {
318 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
321 return ret;
325 lock a record in the ltdb, given a key
327 int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
329 return tdb_chainlock(ctdb_db->ltdb->tdb, key);
333 unlock a record in the ltdb, given a key
335 int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
337 int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
338 if (ret != 0) {
339 DEBUG(DEBUG_ERR,("tdb_chainunlock failed on db %s [%s]\n", ctdb_db->db_name, tdb_errorstr(ctdb_db->ltdb->tdb)));
341 return ret;
346 delete a record from a normal database
348 int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key)
350 if (! ctdb_db_volatile(ctdb_db)) {
351 DEBUG(DEBUG_WARNING,
352 ("Ignored deletion of empty record from "
353 "non-volatile database\n"));
354 return 0;
356 if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
357 DEBUG(DEBUG_ERR,("Failed to delete empty record."));
358 return -1;
360 return 0;
363 int ctdb_trackingdb_add_pnn(struct ctdb_context *ctdb, TDB_DATA *data, uint32_t pnn)
365 int byte_pos = pnn / 8;
366 int bit_mask = 1 << (pnn % 8);
368 if (byte_pos + 1 > data->dsize) {
369 char *buf;
371 buf = malloc(byte_pos + 1);
372 memset(buf, 0, byte_pos + 1);
373 if (buf == NULL) {
374 DEBUG(DEBUG_ERR, ("Out of memory when allocating buffer of %d bytes for trackingdb\n", byte_pos + 1));
375 return -1;
377 if (data->dptr != NULL) {
378 memcpy(buf, data->dptr, data->dsize);
379 free(data->dptr);
381 data->dptr = (uint8_t *)buf;
382 data->dsize = byte_pos + 1;
385 data->dptr[byte_pos] |= bit_mask;
386 return 0;
389 void ctdb_trackingdb_traverse(struct ctdb_context *ctdb, TDB_DATA data, ctdb_trackingdb_cb cb, void *private_data)
391 int i;
393 for(i = 0; i < data.dsize; i++) {
394 int j;
396 for (j=0; j<8; j++) {
397 int mask = 1<<j;
399 if (data.dptr[i] & mask) {
400 cb(ctdb, i * 8 + j, private_data);
407 this is the dummy null procedure that all databases support
409 int ctdb_null_func(struct ctdb_call_info *call)
411 return 0;
415 this is a plain fetch procedure that all databases support
417 int ctdb_fetch_func(struct ctdb_call_info *call)
419 call->reply_data = &call->record_data;
420 return 0;
424 this is a plain fetch procedure that all databases support
425 this returns the full record including the ltdb header
427 int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
429 call->reply_data = talloc(call, TDB_DATA);
430 if (call->reply_data == NULL) {
431 return -1;
433 call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
434 call->reply_data->dptr = talloc_size(call->reply_data, call->reply_data->dsize);
435 if (call->reply_data->dptr == NULL) {
436 return -1;
438 memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
439 memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
441 return 0;