4 Copyright (C) Andrew Tridgell 2006
5 Copyright (C) Ronnie sahlberg 2011
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "../include/ctdb_private.h"
26 #include "lib/tdb_wrap/tdb_wrap.h"
27 #include "lib/util/dlinklist.h"
30 find an attached ctdb_db handle given a name
32 struct ctdb_db_context
*ctdb_db_handle(struct ctdb_context
*ctdb
, const char *name
)
34 struct ctdb_db_context
*tmp_db
;
35 for (tmp_db
=ctdb
->db_list
;tmp_db
;tmp_db
=tmp_db
->next
) {
36 if (strcmp(name
, tmp_db
->db_name
) == 0) {
45 return the lmaster given a key
47 uint32_t ctdb_lmaster(struct ctdb_context
*ctdb
, const TDB_DATA
*key
)
49 uint32_t idx
, lmaster
;
51 idx
= ctdb_hash(key
) % ctdb
->vnn_map
->size
;
52 lmaster
= ctdb
->vnn_map
->map
[idx
];
59 construct an initial header for a record with no ltdb header yet
61 static void ltdb_initial_header(struct ctdb_db_context
*ctdb_db
,
63 struct ctdb_ltdb_header
*header
)
66 /* initial dmaster is the lmaster */
67 header
->dmaster
= ctdb_lmaster(ctdb_db
->ctdb
, &key
);
68 header
->flags
= CTDB_REC_FLAG_AUTOMATIC
;
73 fetch a record from the ltdb, separating out the header information
74 and returning the body of the record. A valid (initial) header is
75 returned if the record is not present
77 int ctdb_ltdb_fetch(struct ctdb_db_context
*ctdb_db
,
78 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
79 TALLOC_CTX
*mem_ctx
, TDB_DATA
*data
)
82 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
84 rec
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
85 if (rec
.dsize
< sizeof(*header
)) {
86 /* return an initial header */
87 if (rec
.dptr
) free(rec
.dptr
);
88 if (ctdb
->vnn_map
== NULL
) {
89 /* called from the client */
91 header
->dmaster
= (uint32_t)-1;
94 ltdb_initial_header(ctdb_db
, key
, header
);
98 if (ctdb_db
->persistent
|| header
->dmaster
== ctdb_db
->ctdb
->pnn
) {
99 if (ctdb_ltdb_store(ctdb_db
, key
, header
, tdb_null
) != 0) {
101 (__location__
"failed to store initial header\n"));
107 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
110 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
111 data
->dptr
= talloc_memdup(mem_ctx
,
112 sizeof(struct ctdb_ltdb_header
)+rec
.dptr
,
118 CTDB_NO_MEMORY(ctdb
, data
->dptr
);
125 fetch a record from the ltdb, separating out the header information
126 and returning the body of the record.
127 if the record does not exist, *header will be NULL
130 int ctdb_ltdb_fetch_with_header(struct ctdb_db_context
*ctdb_db
,
131 TDB_DATA key
, struct ctdb_ltdb_header
*header
,
132 TALLOC_CTX
*mem_ctx
, TDB_DATA
*data
)
136 rec
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
137 if (rec
.dsize
< sizeof(*header
)) {
145 *header
= *(struct ctdb_ltdb_header
*)rec
.dptr
;
147 data
->dsize
= rec
.dsize
- sizeof(struct ctdb_ltdb_header
);
148 data
->dptr
= talloc_memdup(mem_ctx
,
149 sizeof(struct ctdb_ltdb_header
)+rec
.dptr
,
160 write a record to a normal database
162 int ctdb_ltdb_store(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
,
163 struct ctdb_ltdb_header
*header
, TDB_DATA data
)
165 struct ctdb_context
*ctdb
= ctdb_db
->ctdb
;
168 bool seqnum_suppressed
= false;
170 if (ctdb_db
->ctdb_ltdb_store_fn
) {
171 return ctdb_db
->ctdb_ltdb_store_fn(ctdb_db
, key
, header
, data
);
174 if (ctdb
->flags
& CTDB_FLAG_TORTURE
) {
175 struct ctdb_ltdb_header
*h2
;
176 rec
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
177 h2
= (struct ctdb_ltdb_header
*)rec
.dptr
;
178 if (rec
.dptr
&& rec
.dsize
>= sizeof(h2
) && h2
->rsn
> header
->rsn
) {
179 DEBUG(DEBUG_CRIT
,("RSN regression! %llu %llu\n",
180 (unsigned long long)h2
->rsn
, (unsigned long long)header
->rsn
));
182 if (rec
.dptr
) free(rec
.dptr
);
185 rec
.dsize
= sizeof(*header
) + data
.dsize
;
186 rec
.dptr
= talloc_size(ctdb
, rec
.dsize
);
187 CTDB_NO_MEMORY(ctdb
, rec
.dptr
);
189 memcpy(rec
.dptr
, header
, sizeof(*header
));
190 memcpy(rec
.dptr
+ sizeof(*header
), data
.dptr
, data
.dsize
);
192 /* Databases with seqnum updates enabled only get their seqnum
193 changes when/if we modify the data */
194 if (ctdb_db
->seqnum_update
!= NULL
) {
196 old
= tdb_fetch(ctdb_db
->ltdb
->tdb
, key
);
198 if ( (old
.dsize
== rec
.dsize
)
199 && !memcmp(old
.dptr
+sizeof(struct ctdb_ltdb_header
),
200 rec
.dptr
+sizeof(struct ctdb_ltdb_header
),
201 rec
.dsize
-sizeof(struct ctdb_ltdb_header
)) ) {
202 tdb_remove_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
203 seqnum_suppressed
= true;
205 if (old
.dptr
) free(old
.dptr
);
207 ret
= tdb_store(ctdb_db
->ltdb
->tdb
, key
, rec
, TDB_REPLACE
);
209 DEBUG(DEBUG_ERR
, (__location__
" Failed to store dynamic data\n"));
211 if (seqnum_suppressed
) {
212 tdb_add_flags(ctdb_db
->ltdb
->tdb
, TDB_SEQNUM
);
215 talloc_free(rec
.dptr
);
221 lock a record in the ltdb, given a key
223 int ctdb_ltdb_lock(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
225 return tdb_chainlock(ctdb_db
->ltdb
->tdb
, key
);
229 unlock a record in the ltdb, given a key
231 int ctdb_ltdb_unlock(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
233 int ret
= tdb_chainunlock(ctdb_db
->ltdb
->tdb
, key
);
235 DEBUG(DEBUG_ERR
,("tdb_chainunlock failed on db %s [%s]\n", ctdb_db
->db_name
, tdb_errorstr(ctdb_db
->ltdb
->tdb
)));
242 delete a record from a normal database
244 int ctdb_ltdb_delete(struct ctdb_db_context
*ctdb_db
, TDB_DATA key
)
246 if (ctdb_db
->persistent
!= 0) {
247 DEBUG(DEBUG_ERR
,("Trying to delete emty record in persistent database\n"));
250 if (tdb_delete(ctdb_db
->ltdb
->tdb
, key
) != 0) {
251 DEBUG(DEBUG_ERR
,("Failed to delete empty record."));
257 int ctdb_trackingdb_add_pnn(struct ctdb_context
*ctdb
, TDB_DATA
*data
, uint32_t pnn
)
259 int byte_pos
= pnn
/ 8;
260 int bit_mask
= 1 << (pnn
% 8);
262 if (byte_pos
+ 1 > data
->dsize
) {
265 buf
= malloc(byte_pos
+ 1);
266 memset(buf
, 0, byte_pos
+ 1);
268 DEBUG(DEBUG_ERR
, ("Out of memory when allocating buffer of %d bytes for trackingdb\n", byte_pos
+ 1));
271 if (data
->dptr
!= NULL
) {
272 memcpy(buf
, data
->dptr
, data
->dsize
);
275 data
->dptr
= (uint8_t *)buf
;
276 data
->dsize
= byte_pos
+ 1;
279 data
->dptr
[byte_pos
] |= bit_mask
;
283 void ctdb_trackingdb_traverse(struct ctdb_context
*ctdb
, TDB_DATA data
, ctdb_trackingdb_cb cb
, void *private_data
)
287 for(i
= 0; i
< data
.dsize
; i
++) {
290 for (j
=0; j
<8; j
++) {
293 if (data
.dptr
[i
] & mask
) {
294 cb(ctdb
, i
* 8 + j
, private_data
);
301 this is the dummy null procedure that all databases support
303 int ctdb_null_func(struct ctdb_call_info
*call
)
309 this is a plain fetch procedure that all databases support
311 int ctdb_fetch_func(struct ctdb_call_info
*call
)
313 call
->reply_data
= &call
->record_data
;
318 this is a plain fetch procedure that all databases support
319 this returns the full record including the ltdb header
321 int ctdb_fetch_with_header_func(struct ctdb_call_info
*call
)
323 call
->reply_data
= talloc(call
, TDB_DATA
);
324 if (call
->reply_data
== NULL
) {
327 call
->reply_data
->dsize
= sizeof(struct ctdb_ltdb_header
) + call
->record_data
.dsize
;
328 call
->reply_data
->dptr
= talloc_size(call
->reply_data
, call
->reply_data
->dsize
);
329 if (call
->reply_data
->dptr
== NULL
) {
332 memcpy(call
->reply_data
->dptr
, call
->header
, sizeof(struct ctdb_ltdb_header
));
333 memcpy(&call
->reply_data
->dptr
[sizeof(struct ctdb_ltdb_header
)], call
->record_data
.dptr
, call
->record_data
.dsize
);