1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
17 * This is the "low-level" comms layer.
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never waits.
46 #include <asm/ioctls.h>
49 #include <linux/pagemap.h>
51 #include "dlm_internal.h"
62 #define NODE_INCREMENT 32
63 static void cbuf_add(struct cbuf
*cb
, int n
)
68 static int cbuf_data(struct cbuf
*cb
)
70 return ((cb
->base
+ cb
->len
) & cb
->mask
);
73 static void cbuf_init(struct cbuf
*cb
, int size
)
75 cb
->base
= cb
->len
= 0;
79 static void cbuf_eat(struct cbuf
*cb
, int n
)
86 static bool cbuf_empty(struct cbuf
*cb
)
91 /* Maximum number of incoming messages to process before
92 doing a cond_resched()
94 #define MAX_RX_MSG_COUNT 25
97 struct socket
*sock
; /* NULL if not connected */
98 uint32_t nodeid
; /* So we know who we are in the list */
99 struct rw_semaphore sock_sem
; /* Stop connect races */
100 struct list_head read_list
; /* On this list when ready for reading */
101 struct list_head write_list
; /* On this list when ready for writing */
102 struct list_head state_list
; /* On this list when ready to connect */
103 unsigned long flags
; /* bit 1,2 = We are on the read/write lists */
104 #define CF_READ_PENDING 1
105 #define CF_WRITE_PENDING 2
106 #define CF_CONNECT_PENDING 3
107 #define CF_IS_OTHERCON 4
108 struct list_head writequeue
; /* List of outgoing writequeue_entries */
109 struct list_head listenlist
; /* List of allocated listening sockets */
110 spinlock_t writequeue_lock
;
111 int (*rx_action
) (struct connection
*); /* What to do when active */
112 struct page
*rx_page
;
115 atomic_t waiting_requests
;
116 #define MAX_CONNECT_RETRIES 3
117 struct connection
*othercon
;
119 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
121 /* An entry waiting to be sent */
122 struct writequeue_entry
{
123 struct list_head list
;
129 struct connection
*con
;
132 static struct sockaddr_storage dlm_local_addr
;
135 static struct task_struct
*recv_task
;
136 static struct task_struct
*send_task
;
138 static wait_queue_t lowcomms_send_waitq_head
;
139 static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq
);
140 static wait_queue_t lowcomms_recv_waitq_head
;
141 static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq
);
143 /* An array of pointers to connections, indexed by NODEID */
144 static struct connection
**connections
;
145 static DECLARE_MUTEX(connections_lock
);
146 static struct kmem_cache
*con_cache
;
147 static int conn_array_size
;
149 /* List of sockets that have reads pending */
150 static LIST_HEAD(read_sockets
);
151 static DEFINE_SPINLOCK(read_sockets_lock
);
153 /* List of sockets which have writes pending */
154 static LIST_HEAD(write_sockets
);
155 static DEFINE_SPINLOCK(write_sockets_lock
);
157 /* List of sockets which have connects pending */
158 static LIST_HEAD(state_sockets
);
159 static DEFINE_SPINLOCK(state_sockets_lock
);
161 static struct connection
*nodeid2con(int nodeid
, gfp_t allocation
)
163 struct connection
*con
= NULL
;
165 down(&connections_lock
);
166 if (nodeid
>= conn_array_size
) {
167 int new_size
= nodeid
+ NODE_INCREMENT
;
168 struct connection
**new_conns
;
170 new_conns
= kzalloc(sizeof(struct connection
*) *
171 new_size
, allocation
);
175 memcpy(new_conns
, connections
, sizeof(struct connection
*) * conn_array_size
);
176 conn_array_size
= new_size
;
178 connections
= new_conns
;
182 con
= connections
[nodeid
];
183 if (con
== NULL
&& allocation
) {
184 con
= kmem_cache_zalloc(con_cache
, allocation
);
188 con
->nodeid
= nodeid
;
189 init_rwsem(&con
->sock_sem
);
190 INIT_LIST_HEAD(&con
->writequeue
);
191 spin_lock_init(&con
->writequeue_lock
);
193 connections
[nodeid
] = con
;
197 up(&connections_lock
);
201 /* Data available on socket or listen socket received a connect */
202 static void lowcomms_data_ready(struct sock
*sk
, int count_unused
)
204 struct connection
*con
= sock2con(sk
);
206 atomic_inc(&con
->waiting_requests
);
207 if (test_and_set_bit(CF_READ_PENDING
, &con
->flags
))
210 spin_lock_bh(&read_sockets_lock
);
211 list_add_tail(&con
->read_list
, &read_sockets
);
212 spin_unlock_bh(&read_sockets_lock
);
214 wake_up_interruptible(&lowcomms_recv_waitq
);
217 static void lowcomms_write_space(struct sock
*sk
)
219 struct connection
*con
= sock2con(sk
);
221 if (test_and_set_bit(CF_WRITE_PENDING
, &con
->flags
))
224 spin_lock_bh(&write_sockets_lock
);
225 list_add_tail(&con
->write_list
, &write_sockets
);
226 spin_unlock_bh(&write_sockets_lock
);
228 wake_up_interruptible(&lowcomms_send_waitq
);
231 static inline void lowcomms_connect_sock(struct connection
*con
)
233 if (test_and_set_bit(CF_CONNECT_PENDING
, &con
->flags
))
236 spin_lock_bh(&state_sockets_lock
);
237 list_add_tail(&con
->state_list
, &state_sockets
);
238 spin_unlock_bh(&state_sockets_lock
);
240 wake_up_interruptible(&lowcomms_send_waitq
);
243 static void lowcomms_state_change(struct sock
*sk
)
245 if (sk
->sk_state
== TCP_ESTABLISHED
)
246 lowcomms_write_space(sk
);
249 /* Make a socket active */
250 static int add_sock(struct socket
*sock
, struct connection
*con
)
254 /* Install a data_ready callback */
255 con
->sock
->sk
->sk_data_ready
= lowcomms_data_ready
;
256 con
->sock
->sk
->sk_write_space
= lowcomms_write_space
;
257 con
->sock
->sk
->sk_state_change
= lowcomms_state_change
;
262 /* Add the port number to an IP6 or 4 sockaddr and return the address
264 static void make_sockaddr(struct sockaddr_storage
*saddr
, uint16_t port
,
267 saddr
->ss_family
= dlm_local_addr
.ss_family
;
268 if (saddr
->ss_family
== AF_INET
) {
269 struct sockaddr_in
*in4_addr
= (struct sockaddr_in
*)saddr
;
270 in4_addr
->sin_port
= cpu_to_be16(port
);
271 *addr_len
= sizeof(struct sockaddr_in
);
273 struct sockaddr_in6
*in6_addr
= (struct sockaddr_in6
*)saddr
;
274 in6_addr
->sin6_port
= cpu_to_be16(port
);
275 *addr_len
= sizeof(struct sockaddr_in6
);
279 /* Close a remote connection and tidy up */
280 static void close_connection(struct connection
*con
, bool and_other
)
282 down_write(&con
->sock_sem
);
285 sock_release(con
->sock
);
288 if (con
->othercon
&& and_other
) {
289 /* Will only re-enter once. */
290 close_connection(con
->othercon
, false);
293 __free_page(con
->rx_page
);
297 up_write(&con
->sock_sem
);
300 /* Data received from remote end */
301 static int receive_from_sock(struct connection
*con
)
309 int call_again_soon
= 0;
311 down_read(&con
->sock_sem
);
313 if (con
->sock
== NULL
)
315 if (con
->rx_page
== NULL
) {
317 * This doesn't need to be atomic, but I think it should
318 * improve performance if it is.
320 con
->rx_page
= alloc_page(GFP_ATOMIC
);
321 if (con
->rx_page
== NULL
)
323 cbuf_init(&con
->cb
, PAGE_CACHE_SIZE
);
326 msg
.msg_control
= NULL
;
327 msg
.msg_controllen
= 0;
335 * iov[0] is the bit of the circular buffer between the current end
336 * point (cb.base + cb.len) and the end of the buffer.
338 iov
[0].iov_len
= con
->cb
.base
- cbuf_data(&con
->cb
);
339 iov
[0].iov_base
= page_address(con
->rx_page
) + cbuf_data(&con
->cb
);
343 * iov[1] is the bit of the circular buffer between the start of the
344 * buffer and the start of the currently used section (cb.base)
346 if (cbuf_data(&con
->cb
) >= con
->cb
.base
) {
347 iov
[0].iov_len
= PAGE_CACHE_SIZE
- cbuf_data(&con
->cb
);
348 iov
[1].iov_len
= con
->cb
.base
;
349 iov
[1].iov_base
= page_address(con
->rx_page
);
352 len
= iov
[0].iov_len
+ iov
[1].iov_len
;
356 r
= ret
= sock_recvmsg(con
->sock
, &msg
, len
,
357 MSG_DONTWAIT
| MSG_NOSIGNAL
);
364 cbuf_add(&con
->cb
, ret
);
365 ret
= dlm_process_incoming_buffer(con
->nodeid
,
366 page_address(con
->rx_page
),
367 con
->cb
.base
, con
->cb
.len
,
369 if (ret
== -EBADMSG
) {
370 printk(KERN_INFO
"dlm: lowcomms: addr=%p, base=%u, len=%u, "
371 "iov_len=%u, iov_base[0]=%p, read=%d\n",
372 page_address(con
->rx_page
), con
->cb
.base
, con
->cb
.len
,
373 len
, iov
[0].iov_base
, r
);
377 cbuf_eat(&con
->cb
, ret
);
379 if (cbuf_empty(&con
->cb
) && !call_again_soon
) {
380 __free_page(con
->rx_page
);
387 up_read(&con
->sock_sem
);
391 lowcomms_data_ready(con
->sock
->sk
, 0);
392 up_read(&con
->sock_sem
);
397 up_read(&con
->sock_sem
);
398 if (ret
!= -EAGAIN
&& !test_bit(CF_IS_OTHERCON
, &con
->flags
)) {
399 close_connection(con
, false);
400 /* Reconnect when there is something to send */
406 /* Listening socket is busy, accept a connection */
407 static int accept_from_sock(struct connection
*con
)
410 struct sockaddr_storage peeraddr
;
411 struct socket
*newsock
;
414 struct connection
*newcon
;
416 memset(&peeraddr
, 0, sizeof(peeraddr
));
417 result
= sock_create_kern(dlm_local_addr
.ss_family
, SOCK_STREAM
,
418 IPPROTO_TCP
, &newsock
);
422 down_read(&con
->sock_sem
);
425 if (con
->sock
== NULL
)
428 newsock
->type
= con
->sock
->type
;
429 newsock
->ops
= con
->sock
->ops
;
431 result
= con
->sock
->ops
->accept(con
->sock
, newsock
, O_NONBLOCK
);
435 /* Get the connected socket's peer */
436 memset(&peeraddr
, 0, sizeof(peeraddr
));
437 if (newsock
->ops
->getname(newsock
, (struct sockaddr
*)&peeraddr
,
439 result
= -ECONNABORTED
;
443 /* Get the new node's NODEID */
444 make_sockaddr(&peeraddr
, 0, &len
);
445 if (dlm_addr_to_nodeid(&peeraddr
, &nodeid
)) {
446 printk("dlm: connect from non cluster node\n");
447 sock_release(newsock
);
448 up_read(&con
->sock_sem
);
452 log_print("got connection from %d", nodeid
);
454 /* Check to see if we already have a connection to this node. This
455 * could happen if the two nodes initiate a connection at roughly
456 * the same time and the connections cross on the wire.
458 * In this case we store the incoming one in "othercon"
460 newcon
= nodeid2con(nodeid
, GFP_KERNEL
);
465 down_write(&newcon
->sock_sem
);
467 struct connection
*othercon
= newcon
->othercon
;
470 othercon
= kmem_cache_zalloc(con_cache
, GFP_KERNEL
);
472 printk("dlm: failed to allocate incoming socket\n");
473 up_write(&newcon
->sock_sem
);
477 othercon
->nodeid
= nodeid
;
478 othercon
->rx_action
= receive_from_sock
;
479 init_rwsem(&othercon
->sock_sem
);
480 set_bit(CF_IS_OTHERCON
, &othercon
->flags
);
481 newcon
->othercon
= othercon
;
483 othercon
->sock
= newsock
;
484 newsock
->sk
->sk_user_data
= othercon
;
485 add_sock(newsock
, othercon
);
488 newsock
->sk
->sk_user_data
= newcon
;
489 newcon
->rx_action
= receive_from_sock
;
490 add_sock(newsock
, newcon
);
494 up_write(&newcon
->sock_sem
);
497 * Add it to the active queue in case we got data
498 * beween processing the accept adding the socket
499 * to the read_sockets list
501 lowcomms_data_ready(newsock
->sk
, 0);
502 up_read(&con
->sock_sem
);
507 up_read(&con
->sock_sem
);
508 sock_release(newsock
);
510 if (result
!= -EAGAIN
)
511 printk("dlm: error accepting connection from node: %d\n", result
);
515 /* Connect a new socket to its peer */
516 static void connect_to_sock(struct connection
*con
)
518 int result
= -EHOSTUNREACH
;
519 struct sockaddr_storage saddr
;
523 if (con
->nodeid
== 0) {
524 log_print("attempt to connect sock 0 foiled");
528 down_write(&con
->sock_sem
);
529 if (con
->retries
++ > MAX_CONNECT_RETRIES
)
532 /* Some odd races can cause double-connects, ignore them */
538 /* Create a socket to communicate with */
539 result
= sock_create_kern(dlm_local_addr
.ss_family
, SOCK_STREAM
,
544 memset(&saddr
, 0, sizeof(saddr
));
545 if (dlm_nodeid_to_addr(con
->nodeid
, &saddr
))
548 sock
->sk
->sk_user_data
= con
;
549 con
->rx_action
= receive_from_sock
;
551 make_sockaddr(&saddr
, dlm_config
.tcp_port
, &addr_len
);
555 log_print("connecting to %d", con
->nodeid
);
557 sock
->ops
->connect(sock
, (struct sockaddr
*)&saddr
, addr_len
,
559 if (result
== -EINPROGRESS
)
566 sock_release(con
->sock
);
570 * Some errors are fatal and this list might need adjusting. For other
571 * errors we try again until the max number of retries is reached.
573 if (result
!= -EHOSTUNREACH
&& result
!= -ENETUNREACH
&&
574 result
!= -ENETDOWN
&& result
!= EINVAL
575 && result
!= -EPROTONOSUPPORT
) {
576 lowcomms_connect_sock(con
);
580 up_write(&con
->sock_sem
);
584 static struct socket
*create_listen_sock(struct connection
*con
,
585 struct sockaddr_storage
*saddr
)
587 struct socket
*sock
= NULL
;
593 if (dlm_local_addr
.ss_family
== AF_INET
)
594 addr_len
= sizeof(struct sockaddr_in
);
596 addr_len
= sizeof(struct sockaddr_in6
);
598 /* Create a socket to communicate with */
599 result
= sock_create_kern(dlm_local_addr
.ss_family
, SOCK_STREAM
, IPPROTO_TCP
, &sock
);
601 printk("dlm: Can't create listening comms socket\n");
607 result
= sock_setsockopt(sock
, SOL_SOCKET
, SO_REUSEADDR
,
608 (char *)&one
, sizeof(one
));
611 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
614 sock
->sk
->sk_user_data
= con
;
615 con
->rx_action
= accept_from_sock
;
618 /* Bind to our port */
619 make_sockaddr(saddr
, dlm_config
.tcp_port
, &addr_len
);
620 result
= sock
->ops
->bind(sock
, (struct sockaddr
*) saddr
, addr_len
);
622 printk("dlm: Can't bind to port %d\n", dlm_config
.tcp_port
);
632 result
= sock_setsockopt(sock
, SOL_SOCKET
, SO_KEEPALIVE
,
633 (char *)&one
, sizeof(one
));
636 printk("dlm: Set keepalive failed: %d\n", result
);
639 result
= sock
->ops
->listen(sock
, 5);
641 printk("dlm: Can't listen on port %d\n", dlm_config
.tcp_port
);
652 /* Listen on all interfaces */
653 static int listen_for_all(void)
655 struct socket
*sock
= NULL
;
656 struct connection
*con
= nodeid2con(0, GFP_KERNEL
);
657 int result
= -EINVAL
;
659 /* We don't support multi-homed hosts */
660 set_bit(CF_IS_OTHERCON
, &con
->flags
);
662 sock
= create_listen_sock(con
, &dlm_local_addr
);
668 result
= -EADDRINUSE
;
676 static struct writequeue_entry
*new_writequeue_entry(struct connection
*con
,
679 struct writequeue_entry
*entry
;
681 entry
= kmalloc(sizeof(struct writequeue_entry
), allocation
);
685 entry
->page
= alloc_page(allocation
);
700 void *dlm_lowcomms_get_buffer(int nodeid
, int len
,
701 gfp_t allocation
, char **ppc
)
703 struct connection
*con
;
704 struct writequeue_entry
*e
;
708 con
= nodeid2con(nodeid
, allocation
);
712 e
= list_entry(con
->writequeue
.prev
, struct writequeue_entry
, list
);
713 if ((&e
->list
== &con
->writequeue
) ||
714 (PAGE_CACHE_SIZE
- e
->end
< len
)) {
721 spin_unlock(&con
->writequeue_lock
);
727 *ppc
= page_address(e
->page
) + offset
;
731 e
= new_writequeue_entry(con
, allocation
);
733 spin_lock(&con
->writequeue_lock
);
737 list_add_tail(&e
->list
, &con
->writequeue
);
738 spin_unlock(&con
->writequeue_lock
);
744 void dlm_lowcomms_commit_buffer(void *mh
)
746 struct writequeue_entry
*e
= (struct writequeue_entry
*)mh
;
747 struct connection
*con
= e
->con
;
753 e
->len
= e
->end
- e
->offset
;
755 spin_unlock(&con
->writequeue_lock
);
757 if (test_and_set_bit(CF_WRITE_PENDING
, &con
->flags
) == 0) {
758 spin_lock_bh(&write_sockets_lock
);
759 list_add_tail(&con
->write_list
, &write_sockets
);
760 spin_unlock_bh(&write_sockets_lock
);
762 wake_up_interruptible(&lowcomms_send_waitq
);
767 spin_unlock(&con
->writequeue_lock
);
771 static void free_entry(struct writequeue_entry
*e
)
773 __free_page(e
->page
);
778 static void send_to_sock(struct connection
*con
)
781 ssize_t(*sendpage
) (struct socket
*, struct page
*, int, size_t, int);
782 const int msg_flags
= MSG_DONTWAIT
| MSG_NOSIGNAL
;
783 struct writequeue_entry
*e
;
786 down_read(&con
->sock_sem
);
787 if (con
->sock
== NULL
)
790 sendpage
= con
->sock
->ops
->sendpage
;
792 spin_lock(&con
->writequeue_lock
);
794 e
= list_entry(con
->writequeue
.next
, struct writequeue_entry
,
796 if ((struct list_head
*) e
== &con
->writequeue
)
801 BUG_ON(len
== 0 && e
->users
== 0);
802 spin_unlock(&con
->writequeue_lock
);
806 ret
= sendpage(con
->sock
, e
->page
, offset
, len
,
808 if (ret
== -EAGAIN
|| ret
== 0)
814 /* Don't starve people filling buffers */
818 spin_lock(&con
->writequeue_lock
);
822 if (e
->len
== 0 && e
->users
== 0) {
829 spin_unlock(&con
->writequeue_lock
);
831 up_read(&con
->sock_sem
);
835 up_read(&con
->sock_sem
);
836 close_connection(con
, false);
837 lowcomms_connect_sock(con
);
841 up_read(&con
->sock_sem
);
842 lowcomms_connect_sock(con
);
846 static void clean_one_writequeue(struct connection
*con
)
848 struct list_head
*list
;
849 struct list_head
*temp
;
851 spin_lock(&con
->writequeue_lock
);
852 list_for_each_safe(list
, temp
, &con
->writequeue
) {
853 struct writequeue_entry
*e
=
854 list_entry(list
, struct writequeue_entry
, list
);
858 spin_unlock(&con
->writequeue_lock
);
861 /* Called from recovery when it knows that a node has
863 int dlm_lowcomms_close(int nodeid
)
865 struct connection
*con
;
870 log_print("closing connection to node %d", nodeid
);
871 con
= nodeid2con(nodeid
, 0);
873 clean_one_writequeue(con
);
874 close_connection(con
, true);
875 atomic_set(&con
->waiting_requests
, 0);
883 /* API send message call, may queue the request */
884 /* N.B. This is the old interface - use the new one for new calls */
885 int lowcomms_send_message(int nodeid
, char *buf
, int len
, gfp_t allocation
)
887 struct writequeue_entry
*e
;
890 e
= dlm_lowcomms_get_buffer(nodeid
, len
, allocation
, &b
);
893 dlm_lowcomms_commit_buffer(e
);
899 /* Look for activity on active sockets */
900 static void process_sockets(void)
902 struct list_head
*list
;
903 struct list_head
*temp
;
906 spin_lock_bh(&read_sockets_lock
);
907 list_for_each_safe(list
, temp
, &read_sockets
) {
909 struct connection
*con
=
910 list_entry(list
, struct connection
, read_list
);
911 list_del(&con
->read_list
);
912 clear_bit(CF_READ_PENDING
, &con
->flags
);
914 spin_unlock_bh(&read_sockets_lock
);
916 /* This can reach zero if we are processing requests
919 if (atomic_read(&con
->waiting_requests
) == 0) {
920 spin_lock_bh(&read_sockets_lock
);
927 /* Don't starve out everyone else */
928 if (++count
>= MAX_RX_MSG_COUNT
) {
933 } while (!atomic_dec_and_test(&con
->waiting_requests
) &&
934 !kthread_should_stop());
936 spin_lock_bh(&read_sockets_lock
);
938 spin_unlock_bh(&read_sockets_lock
);
941 /* Try to send any messages that are pending
943 static void process_output_queue(void)
945 struct list_head
*list
;
946 struct list_head
*temp
;
948 spin_lock_bh(&write_sockets_lock
);
949 list_for_each_safe(list
, temp
, &write_sockets
) {
950 struct connection
*con
=
951 list_entry(list
, struct connection
, write_list
);
952 clear_bit(CF_WRITE_PENDING
, &con
->flags
);
953 list_del(&con
->write_list
);
955 spin_unlock_bh(&write_sockets_lock
);
957 spin_lock_bh(&write_sockets_lock
);
959 spin_unlock_bh(&write_sockets_lock
);
962 static void process_state_queue(void)
964 struct list_head
*list
;
965 struct list_head
*temp
;
967 spin_lock_bh(&state_sockets_lock
);
968 list_for_each_safe(list
, temp
, &state_sockets
) {
969 struct connection
*con
=
970 list_entry(list
, struct connection
, state_list
);
971 list_del(&con
->state_list
);
972 clear_bit(CF_CONNECT_PENDING
, &con
->flags
);
973 spin_unlock_bh(&state_sockets_lock
);
975 connect_to_sock(con
);
976 spin_lock_bh(&state_sockets_lock
);
978 spin_unlock_bh(&state_sockets_lock
);
982 /* Discard all entries on the write queues */
983 static void clean_writequeues(void)
987 for (nodeid
= 1; nodeid
< conn_array_size
; nodeid
++) {
988 struct connection
*con
= nodeid2con(nodeid
, 0);
991 clean_one_writequeue(con
);
995 static int read_list_empty(void)
999 spin_lock_bh(&read_sockets_lock
);
1000 status
= list_empty(&read_sockets
);
1001 spin_unlock_bh(&read_sockets_lock
);
1006 /* DLM Transport comms receive daemon */
1007 static int dlm_recvd(void *data
)
1009 init_waitqueue_entry(&lowcomms_recv_waitq_head
, current
);
1010 add_wait_queue(&lowcomms_recv_waitq
, &lowcomms_recv_waitq_head
);
1012 while (!kthread_should_stop()) {
1013 set_current_state(TASK_INTERRUPTIBLE
);
1014 if (read_list_empty())
1016 set_current_state(TASK_RUNNING
);
1024 static int write_and_state_lists_empty(void)
1028 spin_lock_bh(&write_sockets_lock
);
1029 status
= list_empty(&write_sockets
);
1030 spin_unlock_bh(&write_sockets_lock
);
1032 spin_lock_bh(&state_sockets_lock
);
1033 if (list_empty(&state_sockets
) == 0)
1035 spin_unlock_bh(&state_sockets_lock
);
1040 /* DLM Transport send daemon */
1041 static int dlm_sendd(void *data
)
1043 init_waitqueue_entry(&lowcomms_send_waitq_head
, current
);
1044 add_wait_queue(&lowcomms_send_waitq
, &lowcomms_send_waitq_head
);
1046 while (!kthread_should_stop()) {
1047 set_current_state(TASK_INTERRUPTIBLE
);
1048 if (write_and_state_lists_empty())
1050 set_current_state(TASK_RUNNING
);
1052 process_state_queue();
1053 process_output_queue();
1059 static void daemons_stop(void)
1061 kthread_stop(recv_task
);
1062 kthread_stop(send_task
);
1065 static int daemons_start(void)
1067 struct task_struct
*p
;
1070 p
= kthread_run(dlm_recvd
, NULL
, "dlm_recvd");
1073 log_print("can't start dlm_recvd %d", error
);
1078 p
= kthread_run(dlm_sendd
, NULL
, "dlm_sendd");
1081 log_print("can't start dlm_sendd %d", error
);
1082 kthread_stop(recv_task
);
1091 * Return the largest buffer size we can cope with.
1093 int lowcomms_max_buffer_size(void)
1095 return PAGE_CACHE_SIZE
;
1098 void dlm_lowcomms_stop(void)
1102 /* Set all the flags to prevent any
1105 for (i
= 0; i
< conn_array_size
; i
++) {
1107 connections
[i
]->flags
|= 0xFF;
1111 clean_writequeues();
1113 for (i
= 0; i
< conn_array_size
; i
++) {
1114 if (connections
[i
]) {
1115 close_connection(connections
[i
], true);
1116 if (connections
[i
]->othercon
)
1117 kmem_cache_free(con_cache
, connections
[i
]->othercon
);
1118 kmem_cache_free(con_cache
, connections
[i
]);
1125 kmem_cache_destroy(con_cache
);
1128 /* This is quite likely to sleep... */
1129 int dlm_lowcomms_start(void)
1134 connections
= kzalloc(sizeof(struct connection
*) *
1135 NODE_INCREMENT
, GFP_KERNEL
);
1139 conn_array_size
= NODE_INCREMENT
;
1141 if (dlm_our_addr(&dlm_local_addr
, 0)) {
1142 log_print("no local IP address has been set");
1143 goto fail_free_conn
;
1145 if (!dlm_our_addr(&dlm_local_addr
, 1)) {
1146 log_print("This dlm comms module does not support multi-homed clustering");
1147 goto fail_free_conn
;
1150 con_cache
= kmem_cache_create("dlm_conn", sizeof(struct connection
),
1151 __alignof__(struct connection
), 0,
1154 goto fail_free_conn
;
1157 /* Start listening */
1158 error
= listen_for_all();
1162 error
= daemons_start();
1169 close_connection(connections
[0], false);
1170 kmem_cache_free(con_cache
, connections
[0]);
1171 kmem_cache_destroy(con_cache
);
1181 * Overrides for Emacs so that we follow Linus's tabbing style.
1182 * Emacs will notice this stuff at the end of the file and automatically
1183 * adjust the settings for this buffer only. This must remain at the end
1185 * ---------------------------------------------------------------------------
1187 * c-file-style: "linux"