1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
14 #include "dlm_internal.h"
15 #include "lockspace.h"
27 static void put_free_de(struct dlm_ls
*ls
, struct dlm_direntry
*de
)
29 spin_lock(&ls
->ls_recover_list_lock
);
30 list_add(&de
->list
, &ls
->ls_recover_list
);
31 spin_unlock(&ls
->ls_recover_list_lock
);
34 static struct dlm_direntry
*get_free_de(struct dlm_ls
*ls
, int len
)
37 struct dlm_direntry
*de
;
39 spin_lock(&ls
->ls_recover_list_lock
);
40 list_for_each_entry(de
, &ls
->ls_recover_list
, list
) {
41 if (de
->length
== len
) {
43 de
->master_nodeid
= 0;
44 memset(de
->name
, 0, len
);
49 spin_unlock(&ls
->ls_recover_list_lock
);
52 de
= allocate_direntry(ls
, len
);
56 void dlm_clear_free_entries(struct dlm_ls
*ls
)
58 struct dlm_direntry
*de
;
60 spin_lock(&ls
->ls_recover_list_lock
);
61 while (!list_empty(&ls
->ls_recover_list
)) {
62 de
= list_entry(ls
->ls_recover_list
.next
, struct dlm_direntry
,
67 spin_unlock(&ls
->ls_recover_list_lock
);
71 * We use the upper 16 bits of the hash value to select the directory node.
72 * Low bits are used for distribution of rsb's among hash buckets on each node.
74 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
75 * num_nodes to the hash value. This value in the desired range is used as an
76 * offset into the sorted list of nodeid's to give the particular nodeid.
79 int dlm_hash2nodeid(struct dlm_ls
*ls
, uint32_t hash
)
81 struct list_head
*tmp
;
82 struct dlm_member
*memb
= NULL
;
86 if (ls
->ls_num_nodes
== 1) {
87 nodeid
= dlm_our_nodeid();
91 if (ls
->ls_node_array
) {
92 node
= (hash
>> 16) % ls
->ls_total_weight
;
93 nodeid
= ls
->ls_node_array
[node
];
97 /* make_member_array() failed to kmalloc ls_node_array... */
99 node
= (hash
>> 16) % ls
->ls_num_nodes
;
101 list_for_each(tmp
, &ls
->ls_nodes
) {
104 memb
= list_entry(tmp
, struct dlm_member
, list
);
108 DLM_ASSERT(memb
, printk("num_nodes=%u n=%u node=%u\n",
109 ls
->ls_num_nodes
, n
, node
););
110 nodeid
= memb
->nodeid
;
115 int dlm_dir_nodeid(struct dlm_rsb
*r
)
117 return dlm_hash2nodeid(r
->res_ls
, r
->res_hash
);
120 static inline uint32_t dir_hash(struct dlm_ls
*ls
, char *name
, int len
)
124 val
= jhash(name
, len
, 0);
125 val
&= (ls
->ls_dirtbl_size
- 1);
130 static void add_entry_to_hash(struct dlm_ls
*ls
, struct dlm_direntry
*de
)
134 bucket
= dir_hash(ls
, de
->name
, de
->length
);
135 list_add_tail(&de
->list
, &ls
->ls_dirtbl
[bucket
].list
);
138 static struct dlm_direntry
*search_bucket(struct dlm_ls
*ls
, char *name
,
139 int namelen
, uint32_t bucket
)
141 struct dlm_direntry
*de
;
143 list_for_each_entry(de
, &ls
->ls_dirtbl
[bucket
].list
, list
) {
144 if (de
->length
== namelen
&& !memcmp(name
, de
->name
, namelen
))
152 void dlm_dir_remove_entry(struct dlm_ls
*ls
, int nodeid
, char *name
, int namelen
)
154 struct dlm_direntry
*de
;
157 bucket
= dir_hash(ls
, name
, namelen
);
159 write_lock(&ls
->ls_dirtbl
[bucket
].lock
);
161 de
= search_bucket(ls
, name
, namelen
, bucket
);
164 log_error(ls
, "remove fr %u none", nodeid
);
168 if (de
->master_nodeid
!= nodeid
) {
169 log_error(ls
, "remove fr %u ID %u", nodeid
, de
->master_nodeid
);
176 write_unlock(&ls
->ls_dirtbl
[bucket
].lock
);
179 void dlm_dir_clear(struct dlm_ls
*ls
)
181 struct list_head
*head
;
182 struct dlm_direntry
*de
;
185 DLM_ASSERT(list_empty(&ls
->ls_recover_list
), );
187 for (i
= 0; i
< ls
->ls_dirtbl_size
; i
++) {
188 write_lock(&ls
->ls_dirtbl
[i
].lock
);
189 head
= &ls
->ls_dirtbl
[i
].list
;
190 while (!list_empty(head
)) {
191 de
= list_entry(head
->next
, struct dlm_direntry
, list
);
195 write_unlock(&ls
->ls_dirtbl
[i
].lock
);
199 int dlm_recover_directory(struct dlm_ls
*ls
)
201 struct dlm_member
*memb
;
202 struct dlm_direntry
*de
;
203 char *b
, *last_name
= NULL
;
204 int error
= -ENOMEM
, last_len
, count
= 0;
207 log_debug(ls
, "dlm_recover_directory");
209 if (dlm_no_directory(ls
))
214 last_name
= kmalloc(DLM_RESNAME_MAXLEN
, GFP_KERNEL
);
218 list_for_each_entry(memb
, &ls
->ls_nodes
, list
) {
219 memset(last_name
, 0, DLM_RESNAME_MAXLEN
);
223 error
= dlm_recovery_stopped(ls
);
227 error
= dlm_rcom_names(ls
, memb
->nodeid
,
228 last_name
, last_len
);
235 * pick namelen/name pairs out of received buffer
238 b
= ls
->ls_recover_buf
+ sizeof(struct dlm_rcom
);
241 memcpy(&namelen
, b
, sizeof(uint16_t));
242 namelen
= be16_to_cpu(namelen
);
243 b
+= sizeof(uint16_t);
245 /* namelen of 0xFFFFF marks end of names for
246 this node; namelen of 0 marks end of the
249 if (namelen
== 0xFFFF)
255 de
= get_free_de(ls
, namelen
);
259 de
->master_nodeid
= memb
->nodeid
;
260 de
->length
= namelen
;
262 memcpy(de
->name
, b
, namelen
);
263 memcpy(last_name
, b
, namelen
);
266 add_entry_to_hash(ls
, de
);
276 dlm_set_recover_status(ls
, DLM_RS_DIR
);
277 log_debug(ls
, "dlm_recover_directory %d entries", count
);
281 dlm_clear_free_entries(ls
);
285 static int get_entry(struct dlm_ls
*ls
, int nodeid
, char *name
,
286 int namelen
, int *r_nodeid
)
288 struct dlm_direntry
*de
, *tmp
;
291 bucket
= dir_hash(ls
, name
, namelen
);
293 write_lock(&ls
->ls_dirtbl
[bucket
].lock
);
294 de
= search_bucket(ls
, name
, namelen
, bucket
);
296 *r_nodeid
= de
->master_nodeid
;
297 write_unlock(&ls
->ls_dirtbl
[bucket
].lock
);
298 if (*r_nodeid
== nodeid
)
303 write_unlock(&ls
->ls_dirtbl
[bucket
].lock
);
305 de
= allocate_direntry(ls
, namelen
);
309 de
->master_nodeid
= nodeid
;
310 de
->length
= namelen
;
311 memcpy(de
->name
, name
, namelen
);
313 write_lock(&ls
->ls_dirtbl
[bucket
].lock
);
314 tmp
= search_bucket(ls
, name
, namelen
, bucket
);
319 list_add_tail(&de
->list
, &ls
->ls_dirtbl
[bucket
].list
);
321 *r_nodeid
= de
->master_nodeid
;
322 write_unlock(&ls
->ls_dirtbl
[bucket
].lock
);
326 int dlm_dir_lookup(struct dlm_ls
*ls
, int nodeid
, char *name
, int namelen
,
329 return get_entry(ls
, nodeid
, name
, namelen
, r_nodeid
);
332 /* Copy the names of master rsb's into the buffer provided.
333 Only select names whose dir node is the given nodeid. */
335 void dlm_copy_master_names(struct dlm_ls
*ls
, char *inbuf
, int inlen
,
336 char *outbuf
, int outlen
, int nodeid
)
338 struct list_head
*list
;
339 struct dlm_rsb
*start_r
= NULL
, *r
= NULL
;
340 int offset
= 0, start_namelen
, error
, dir_nodeid
;
345 * Find the rsb where we left off (or start again)
348 start_namelen
= inlen
;
351 if (start_namelen
> 1) {
353 * We could also use a find_rsb_root() function here that
354 * searched the ls_root_list.
356 error
= dlm_find_rsb(ls
, start_name
, start_namelen
, R_MASTER
,
358 DLM_ASSERT(!error
&& start_r
,
359 printk("error %d\n", error
););
360 DLM_ASSERT(!list_empty(&start_r
->res_root_list
),
361 dlm_print_rsb(start_r
););
362 dlm_put_rsb(start_r
);
366 * Send rsb names for rsb's we're master of and whose directory node
367 * matches the requesting node.
370 down_read(&ls
->ls_root_sem
);
372 list
= start_r
->res_root_list
.next
;
374 list
= ls
->ls_root_list
.next
;
376 for (offset
= 0; list
!= &ls
->ls_root_list
; list
= list
->next
) {
377 r
= list_entry(list
, struct dlm_rsb
, res_root_list
);
381 dir_nodeid
= dlm_dir_nodeid(r
);
382 if (dir_nodeid
!= nodeid
)
386 * The block ends when we can't fit the following in the
387 * remaining buffer space:
388 * namelen (uint16_t) +
389 * name (r->res_length) +
390 * end-of-block record 0x0000 (uint16_t)
393 if (offset
+ sizeof(uint16_t)*2 + r
->res_length
> outlen
) {
394 /* Write end-of-block record */
396 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(uint16_t));
397 offset
+= sizeof(uint16_t);
401 be_namelen
= cpu_to_be16(r
->res_length
);
402 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(uint16_t));
403 offset
+= sizeof(uint16_t);
404 memcpy(outbuf
+ offset
, r
->res_name
, r
->res_length
);
405 offset
+= r
->res_length
;
409 * If we've reached the end of the list (and there's room) write a
410 * terminating record.
413 if ((list
== &ls
->ls_root_list
) &&
414 (offset
+ sizeof(uint16_t) <= outlen
)) {
416 memcpy(outbuf
+ offset
, &be_namelen
, sizeof(uint16_t));
417 offset
+= sizeof(uint16_t);
421 up_read(&ls
->ls_root_sem
);