2 Unix SMB/CIFS implementation.
3 Database interface wrapper around ctdbd
4 Copyright (C) Volker Lendecke 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #ifdef CLUSTER_SUPPORT
26 #include "ctdb_private.h"
29 struct tdb_wrap
*wtdb
;
31 struct ctdbd_connection
*conn
;
35 struct db_ctdb_ctx
*ctdb_ctx
;
36 struct ctdb_ltdb_header header
;
39 static struct ctdbd_connection
*db_ctdbd_conn(struct db_ctdb_ctx
*ctx
);
41 static NTSTATUS
db_ctdb_store(struct db_record
*rec
, TDB_DATA data
, int flag
)
43 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
44 rec
->private_data
, struct db_ctdb_rec
);
48 cdata
.dsize
= sizeof(crec
->header
) + data
.dsize
;
50 if (!(cdata
.dptr
= SMB_MALLOC_ARRAY(uint8
, cdata
.dsize
))) {
51 return NT_STATUS_NO_MEMORY
;
54 memcpy(cdata
.dptr
, &crec
->header
, sizeof(crec
->header
));
55 memcpy(cdata
.dptr
+ sizeof(crec
->header
), data
.dptr
, data
.dsize
);
57 ret
= tdb_store(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
, cdata
, TDB_REPLACE
);
59 SAFE_FREE(cdata
.dptr
);
61 return (ret
== 0) ? NT_STATUS_OK
: NT_STATUS_INTERNAL_DB_CORRUPTION
;
64 static NTSTATUS
db_ctdb_delete(struct db_record
*rec
)
66 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
67 rec
->private_data
, struct db_ctdb_rec
);
72 * We have to store the header with empty data. TODO: Fix the
76 data
.dptr
= (uint8
*)&crec
->header
;
77 data
.dsize
= sizeof(crec
->header
);
79 ret
= tdb_store(crec
->ctdb_ctx
->wtdb
->tdb
, rec
->key
, data
, TDB_REPLACE
);
81 return (ret
== 0) ? NT_STATUS_OK
: NT_STATUS_INTERNAL_DB_CORRUPTION
;
84 static int db_ctdb_record_destr(struct db_record
* data
)
86 struct db_ctdb_rec
*crec
= talloc_get_type_abort(
87 data
->private_data
, struct db_ctdb_rec
);
89 DEBUG(10, ("Unlocking key %s\n",
90 hex_encode(data
, (unsigned char *)data
->key
.dptr
,
93 if (tdb_chainunlock(crec
->ctdb_ctx
->wtdb
->tdb
, data
->key
) != 0) {
94 DEBUG(0, ("tdb_chainunlock failed\n"));
101 static struct db_record
*db_ctdb_fetch_locked(struct db_context
*db
,
105 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
107 struct db_record
*result
;
108 struct db_ctdb_rec
*crec
;
112 if (!(result
= talloc(mem_ctx
, struct db_record
))) {
113 DEBUG(0, ("talloc failed\n"));
117 if (!(crec
= TALLOC_ZERO_P(result
, struct db_ctdb_rec
))) {
118 DEBUG(0, ("talloc failed\n"));
123 result
->private_data
= (void *)crec
;
124 crec
->ctdb_ctx
= ctx
;
126 result
->key
.dsize
= key
.dsize
;
127 result
->key
.dptr
= (uint8
*)talloc_memdup(result
, key
.dptr
, key
.dsize
);
128 if (result
->key
.dptr
== NULL
) {
129 DEBUG(0, ("talloc failed\n"));
135 * Do a blocking lock on the record
139 DEBUG(10, ("Locking key %s\n",
140 hex_encode(result
, (unsigned char *)key
.dptr
,
143 if (tdb_chainlock(ctx
->wtdb
->tdb
, key
) != 0) {
144 DEBUG(3, ("tdb_chainlock failed\n"));
149 result
->store
= db_ctdb_store
;
150 result
->delete_rec
= db_ctdb_delete
;
151 talloc_set_destructor(result
, db_ctdb_record_destr
);
153 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
156 * See if we have a valid record and we are the dmaster. If so, we can
157 * take the shortcut and just return it.
160 if ((ctdb_data
.dptr
== NULL
) ||
161 (ctdb_data
.dsize
< sizeof(struct ctdb_ltdb_header
)) ||
162 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
!= get_my_vnn()
164 || (random() % 2 != 0)
167 SAFE_FREE(ctdb_data
.dptr
);
168 tdb_chainunlock(ctx
->wtdb
->tdb
, key
);
169 talloc_set_destructor(result
, NULL
);
171 DEBUG(10, ("ctdb_data.dptr = %p, dmaster = %u (%u)\n",
172 ctdb_data
.dptr
, ctdb_data
.dptr
?
173 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
: -1,
176 status
= ctdbd_migrate(db_ctdbd_conn(ctx
), ctx
->db_id
, key
);
177 if (!NT_STATUS_IS_OK(status
)) {
178 DEBUG(5, ("ctdb_migrate failed: %s\n",
183 /* now its migrated, try again */
187 memcpy(&crec
->header
, ctdb_data
.dptr
, sizeof(crec
->header
));
189 result
->value
.dsize
= ctdb_data
.dsize
- sizeof(crec
->header
);
190 result
->value
.dptr
= NULL
;
192 if ((result
->value
.dsize
!= 0)
193 && !(result
->value
.dptr
= (uint8
*)talloc_memdup(
194 result
, ctdb_data
.dptr
+ sizeof(crec
->header
),
195 result
->value
.dsize
))) {
196 DEBUG(0, ("talloc failed\n"));
200 SAFE_FREE(ctdb_data
.dptr
);
206 fetch (unlocked, no migration) operation on ctdb
208 static int db_ctdb_fetch(struct db_context
*db
, TALLOC_CTX
*mem_ctx
,
209 TDB_DATA key
, TDB_DATA
*data
)
211 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
216 /* try a direct fetch */
217 ctdb_data
= tdb_fetch(ctx
->wtdb
->tdb
, key
);
220 * See if we have a valid record and we are the dmaster. If so, we can
221 * take the shortcut and just return it.
223 if ((ctdb_data
.dptr
!= NULL
) &&
224 (ctdb_data
.dsize
>= sizeof(struct ctdb_ltdb_header
)) &&
225 ((struct ctdb_ltdb_header
*)ctdb_data
.dptr
)->dmaster
== get_my_vnn()) {
226 /* we are the dmaster - avoid the ctdb protocol op */
228 data
->dsize
= ctdb_data
.dsize
- sizeof(struct ctdb_ltdb_header
);
229 if (data
->dsize
== 0) {
230 SAFE_FREE(ctdb_data
.dptr
);
235 data
->dptr
= (uint8
*)talloc_memdup(
236 mem_ctx
, ctdb_data
.dptr
+sizeof(struct ctdb_ltdb_header
),
239 SAFE_FREE(ctdb_data
.dptr
);
241 if (data
->dptr
== NULL
) {
247 SAFE_FREE(ctdb_data
.dptr
);
249 /* we weren't able to get it locally - ask ctdb to fetch it for us */
250 status
= ctdbd_fetch(db_ctdbd_conn(ctx
), ctx
->db_id
, key
, mem_ctx
,
252 if (!NT_STATUS_IS_OK(status
)) {
253 DEBUG(5, ("ctdbd_fetch failed: %s\n", nt_errstr(status
)));
260 struct traverse_state
{
261 struct db_context
*db
;
262 int (*fn
)(struct db_record
*rec
, void *private_data
);
266 static void traverse_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
268 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
269 struct db_record
*rec
;
270 TALLOC_CTX
*tmp_ctx
= talloc_new(state
->db
);
271 /* we have to give them a locked record to prevent races */
272 rec
= db_ctdb_fetch_locked(state
->db
, tmp_ctx
, key
);
273 if (rec
&& rec
->value
.dsize
> 0) {
274 state
->fn(rec
, state
->private_data
);
276 talloc_free(tmp_ctx
);
279 static int db_ctdb_traverse(struct db_context
*db
,
280 int (*fn
)(struct db_record
*rec
,
284 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
286 struct traverse_state state
;
290 state
.private_data
= private_data
;
292 ctdbd_traverse(ctx
->db_id
, traverse_callback
, &state
);
296 static NTSTATUS
db_ctdb_store_deny(struct db_record
*rec
, TDB_DATA data
, int flag
)
298 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
301 static NTSTATUS
db_ctdb_delete_deny(struct db_record
*rec
)
303 return NT_STATUS_MEDIA_WRITE_PROTECTED
;
306 static void traverse_read_callback(TDB_DATA key
, TDB_DATA data
, void *private_data
)
308 struct traverse_state
*state
= (struct traverse_state
*)private_data
;
309 struct db_record rec
;
312 rec
.store
= db_ctdb_store_deny
;
313 rec
.delete_rec
= db_ctdb_delete_deny
;
314 rec
.private_data
= state
->db
;
315 state
->fn(&rec
, state
->private_data
);
318 static int db_ctdb_traverse_read(struct db_context
*db
,
319 int (*fn
)(struct db_record
*rec
,
323 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
325 struct traverse_state state
;
329 state
.private_data
= private_data
;
331 ctdbd_traverse(ctx
->db_id
, traverse_read_callback
, &state
);
335 static int db_ctdb_get_seqnum(struct db_context
*db
)
337 struct db_ctdb_ctx
*ctx
= talloc_get_type_abort(db
->private_data
,
339 return tdb_get_seqnum(ctx
->wtdb
->tdb
);
343 * Get the ctdbd connection for a database. If possible, re-use the messaging
346 static struct ctdbd_connection
*db_ctdbd_conn(struct db_ctdb_ctx
*ctx
)
348 struct ctdbd_connection
*result
;
350 result
= messaging_ctdbd_connection();
352 if (result
!= NULL
) {
354 if (ctx
->conn
== NULL
) {
356 * Someone has initialized messaging since we
357 * initialized our own connection, we don't need it
360 TALLOC_FREE(ctx
->conn
);
366 if (ctx
->conn
== NULL
) {
367 ctdbd_init_connection(ctx
, &ctx
->conn
);
368 set_my_vnn(ctdbd_vnn(ctx
->conn
));
374 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
376 int hash_size
, int tdb_flags
,
377 int open_flags
, mode_t mode
)
379 struct db_context
*result
;
380 struct db_ctdb_ctx
*db_ctdb
;
384 if (!lp_clustering()) {
385 DEBUG(10, ("Clustering disabled -- no ctdb\n"));
389 if (!(result
= TALLOC_ZERO_P(mem_ctx
, struct db_context
))) {
390 DEBUG(0, ("talloc failed\n"));
395 if (!(db_ctdb
= TALLOC_P(result
, struct db_ctdb_ctx
))) {
396 DEBUG(0, ("talloc failed\n"));
401 db_ctdb
->conn
= NULL
;
403 status
= ctdbd_db_attach(db_ctdbd_conn(db_ctdb
), name
,
404 &db_ctdb
->db_id
, tdb_flags
);
406 if (!NT_STATUS_IS_OK(status
)) {
407 DEBUG(0, ("ctdbd_db_attach failed for %s: %s\n", name
,
413 db_path
= ctdbd_dbpath(db_ctdbd_conn(db_ctdb
), db_ctdb
,
416 /* only pass through specific flags */
417 tdb_flags
&= TDB_SEQNUM
;
419 db_ctdb
->wtdb
= tdb_wrap_open(db_ctdb
, db_path
, hash_size
, tdb_flags
, O_RDWR
, 0);
420 if (db_ctdb
->wtdb
== NULL
) {
421 DEBUG(0, ("Could not open tdb %s: %s\n", db_path
, strerror(errno
)));
425 talloc_free(db_path
);
427 result
->private_data
= (void *)db_ctdb
;
428 result
->fetch_locked
= db_ctdb_fetch_locked
;
429 result
->fetch
= db_ctdb_fetch
;
430 result
->traverse
= db_ctdb_traverse
;
431 result
->traverse_read
= db_ctdb_traverse_read
;
432 result
->get_seqnum
= db_ctdb_get_seqnum
;
434 DEBUG(3,("db_open_ctdb: opened database '%s' with dbid 0x%x\n",
435 name
, db_ctdb
->db_id
));
442 struct db_context
*db_open_ctdb(TALLOC_CTX
*mem_ctx
,
444 int hash_size
, int tdb_flags
,
445 int open_flags
, mode_t mode
)
447 DEBUG(0, ("no clustering compiled in\n"));