4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Ronnie sahlberg 2011
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "system/network.h"
23 #include "system/filesys.h"
27 #include "lib/tdb_wrap/tdb_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/util/debug.h"
31 #include "ctdb_private.h"
33 #include "common/common.h"
34 #include "common/logging.h"
38 * Calculate tdb flags based on databse type
40 int ctdb_db_tdb_flags(uint8_t db_flags
, bool with_valgrind
, bool with_mutex
)
44 if (db_flags
& CTDB_DB_FLAGS_PERSISTENT
) {
45 tdb_flags
= TDB_DEFAULT
;
47 } else if (db_flags
& CTDB_DB_FLAGS_REPLICATED
) {
48 tdb_flags
= TDB_NOSYNC
|
50 TDB_INCOMPATIBLE_HASH
;
53 tdb_flags
= TDB_NOSYNC
|
55 TDB_INCOMPATIBLE_HASH
;
57 #ifdef TDB_MUTEX_LOCKING
58 if (with_mutex
&& tdb_runtime_check_for_robust_mutexes()) {
59 tdb_flags
|= TDB_MUTEX_LOCKING
;
65 tdb_flags
|= TDB_DISALLOW_NESTING
;
67 tdb_flags
|= TDB_NOMMAP
;
74 find an attached ctdb_db handle given a name
76 struct ctdb_db_context
*ctdb_db_handle(struct ctdb_context
*ctdb
, const char *name
)
78 struct ctdb_db_context
*tmp_db
;
79 for (tmp_db
=ctdb
->db_list
;tmp_db
;tmp_db
=tmp_db
->next
) {
80 if (strcmp(name
, tmp_db
->db_name
) == 0) {
87 bool ctdb_db_persistent(struct ctdb_db_context
*ctdb_db
)
89 if (ctdb_db
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) {
95 bool ctdb_db_replicated(struct ctdb_db_context
*ctdb_db
)
97 if (ctdb_db
->db_flags
& CTDB_DB_FLAGS_REPLICATED
) {
103 bool ctdb_db_volatile(struct ctdb_db_context
*ctdb_db
)
105 if ((ctdb_db
->db_flags
& CTDB_DB_FLAGS_PERSISTENT
) ||
106 (ctdb_db
->db_flags
& CTDB_DB_FLAGS_REPLICATED
)) {
112 bool ctdb_db_readonly(struct ctdb_db_context
*ctdb_db
)
114 if (ctdb_db
->db_flags
& CTDB_DB_FLAGS_READONLY
) {
120 void ctdb_db_set_readonly(struct ctdb_db_context
*ctdb_db
)
122 ctdb_db
->db_flags
|= CTDB_DB_FLAGS_READONLY
;
125 void ctdb_db_reset_readonly(struct ctdb_db_context
*ctdb_db
)
127 ctdb_db
->db_flags
&= ~CTDB_DB_FLAGS_READONLY
;
130 bool ctdb_db_sticky(struct ctdb_db_context
*ctdb_db
)
132 if (ctdb_db
->db_flags
& CTDB_DB_FLAGS_STICKY
) {
138 void ctdb_db_set_sticky(struct ctdb_db_context
*ctdb_db
)
140 ctdb_db
->db_flags
|= CTDB_DB_FLAGS_STICKY
;
144 return the lmaster given a key
146 uint32_t ctdb_lmaster(struct ctdb_context
*ctdb
, const TDB_DATA
*key
)
148 uint32_t idx
, lmaster
;
150 idx
= ctdb_hash(key
) % ctdb
->vnn_map
->size
;
151 lmaster
= ctdb
->vnn_map
->map
[idx
];
158 construct an initial header for a record with no ltdb header yet
160 static void ltdb_initial_header(struct ctdb_db_context
*ctdb_db
,
162 struct ctdb_ltdb_header
*header
)
164 ZERO_STRUCTP(header
);
165 /* initial dmaster is the lmaster */
166 header
->dmaster
= ctdb_lmaster(ctdb_db
->ctdb
, &key
);
167 header
->flags
= CTDB_REC_FLAG_AUTOMATIC
;
170 struct ctdb_ltdb_fetch_state
{
171 struct ctdb_ltdb_header
*header
;
178 static int ctdb_ltdb_fetch_fn(TDB_DATA key
, TDB_DATA data
, void *private_data
)
180 struct ctdb_ltdb_fetch_state
*state
= private_data
;
181 struct ctdb_ltdb_header
*header
= state
->header
;
182 TDB_DATA
*dstdata
= state
->data
;
184 if (data
.dsize
< sizeof(*header
)) {
189 memcpy(header
, data
.dptr
, sizeof(*header
));
191 if (dstdata
!= NULL
) {
192 dstdata
->dsize
= data
.dsize
- sizeof(struct ctdb_ltdb_header
);
193 dstdata
->dptr
= talloc_memdup(
195 data
.dptr
+ sizeof(struct ctdb_ltdb_header
),
197 if (dstdata
->dptr
== NULL
) {
206 fetch a record from the ltdb, separating out the header information
207 and returning the body of the record. A valid (initial) header is
208 returned if the record is not present
210 int ctdb_ltdb_fetch(struct ctdb_db_context
*ctdb_db
,
211 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
212 TALLOC_CTX
*mem_ctx
, TDB_DATA
*data
)
214 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
215 struct ctdb_ltdb_fetch_state state
= {
223 ret
= tdb_parse_record(
224 ctdb_db
->ltdb
->tdb
, key
, ctdb_ltdb_fetch_fn
, &state
);
227 enum TDB_ERROR err
= tdb_error(ctdb_db
->ltdb
->tdb
);
228 if (err
!= TDB_ERR_NOEXIST
) {
233 if (state
.ret
!= 0) {
234 DBG_DEBUG("ctdb_ltdb_fetch_fn failed\n");
246 if (ctdb
->vnn_map
== NULL
) {
247 /* called from the client */
248 header
->dmaster
= (uint32_t)-1;
252 ltdb_initial_header(ctdb_db
, key
, header
);
253 if (ctdb_db_persistent(ctdb_db
) ||
254 header
->dmaster
== ctdb_db
->ctdb
->pnn
) {
256 ret
= ctdb_ltdb_store(ctdb_db
, key
, header
, tdb_null
);
258 DBG_NOTICE("failed to store initial header\n");
266 write a record to a normal database
268 int ctdb_ltdb_store(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
,
269 struct ctdb_ltdb_header
*header
, TDB_DATA data
)
271 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
273 uint32_t hsize
= sizeof(struct ctdb_ltdb_header
);
276 if (ctdb_db
->ctdb_ltdb_store_fn
) {
277 return ctdb_db
->ctdb_ltdb_store_fn(ctdb_db
, key
, header
, data
);
280 if (ctdb
->flags
& CTDB_FLAG_TORTURE
) {
282 struct ctdb_ltdb_header
*h2
;
284 old
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
285 h2
= (struct ctdb_ltdb_header
*)old
.dptr
;
286 if (old
.dptr
!= NULL
&& old
.dsize
>= hsize
&&
287 h2
->rsn
> header
->rsn
) {
289 ("RSN regression! %"PRIu64
" %"PRIu64
"\n",
290 h2
->rsn
, header
->rsn
));
292 if (old
.dptr
!= NULL
) {
297 rec
[0].dsize
= hsize
;
298 rec
[0].dptr
= (uint8_t *)header
;
300 rec
[1].dsize
= data
.dsize
;
301 rec
[1].dptr
= data
.dptr
;
303 ret
= tdb_storev(ctdb_db
->ltdb
->tdb
, key
, rec
, 2, TDB_REPLACE
);
305 DEBUG(DEBUG_ERR
, (__location__
" Failed to store dynamic data\n"));
312 lock a record in the ltdb, given a key
314 int ctdb_ltdb_lock(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
316 return tdb_chainlock(ctdb_db
->ltdb
->tdb
, key
);
320 unlock a record in the ltdb, given a key
322 int ctdb_ltdb_unlock(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
324 int ret
= tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
326 DEBUG(DEBUG_ERR
,("tdb_chainunlock failed on db %s [%s]\n", ctdb_db
->db_name
, tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
333 delete a record from a normal database
335 int ctdb_ltdb_delete(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
337 if (! ctdb_db_volatile(ctdb_db
)) {
339 ("Ignored deletion of empty record from "
340 "non-volatile database\n"));
343 if (tdb_delete(ctdb_db
->ltdb
->tdb
, key
) != 0) {
344 DEBUG(DEBUG_ERR
,("Failed to delete empty record."));
350 int ctdb_trackingdb_add_pnn(struct ctdb_context
*ctdb
, TDB_DATA
*data
, uint32_t pnn
)
352 unsigned int byte_pos
= pnn
/ 8;
353 unsigned char bit_mask
= 1 << (pnn
% 8);
355 if (byte_pos
+ 1 > data
->dsize
) {
358 buf
= malloc(byte_pos
+ 1);
359 memset(buf
, 0, byte_pos
+ 1);
361 DEBUG(DEBUG_ERR
, ("Out of memory when allocating buffer of %d bytes for trackingdb\n", byte_pos
+ 1));
364 if (data
->dptr
!= NULL
) {
365 memcpy(buf
, data
->dptr
, data
->dsize
);
368 data
->dptr
= (uint8_t *)buf
;
369 data
->dsize
= byte_pos
+ 1;
372 data
->dptr
[byte_pos
] |= bit_mask
;
376 void ctdb_trackingdb_traverse(struct ctdb_context
*ctdb
, TDB_DATA data
, ctdb_trackingdb_cb cb
, void *private_data
)
380 for(i
= 0; i
< data
.dsize
; i
++) {
383 for (j
=0; j
<8; j
++) {
386 if (data
.dptr
[i
] & mask
) {
387 cb(ctdb
, i
* 8 + j
, private_data
);
394 this is the dummy null procedure that all databases support
396 int ctdb_null_func(struct ctdb_call_info
*call
)
402 this is a plain fetch procedure that all databases support
404 int ctdb_fetch_func(struct ctdb_call_info
*call
)
406 call
->reply_data
= &call
->record_data
;
411 this is a plain fetch procedure that all databases support
412 this returns the full record including the ltdb header
414 int ctdb_fetch_with_header_func(struct ctdb_call_info
*call
)
416 call
->reply_data
= talloc(call
, TDB_DATA
);
417 if (call
->reply_data
== NULL
) {
420 call
->reply_data
->dsize
= sizeof(struct ctdb_ltdb_header
) + call
->record_data
.dsize
;
421 call
->reply_data
->dptr
= talloc_size(call
->reply_data
, call
->reply_data
->dsize
);
422 if (call
->reply_data
->dptr
== NULL
) {
425 memcpy(call
->reply_data
->dptr
, call
->header
, sizeof(struct ctdb_ltdb_header
));
426 memcpy(&call
->reply_data
->dptr
[sizeof(struct ctdb_ltdb_header
)], call
->record_data
.dptr
, call
->record_data
.dsize
);