1 /******************************************************************************
2 *******************************************************************************
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
17 * This is the "low-level" comms layer.
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never waits.
46 #include <asm/ioctls.h>
49 #include <linux/pagemap.h>
51 #include "dlm_internal.h"
66 #define NODE_INCREMENT 32
68 #define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0)
69 #define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
70 #define CBUF_EMPTY(cb) ((cb)->len == 0)
71 #define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
72 #define CBUF_EAT(cb, n) do { (cb)->len -= (n); \
73 (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0)
74 #define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
76 /* Maximum number of incoming messages to process before
79 #define MAX_RX_MSG_COUNT 25
82 struct socket
*sock
; /* NULL if not connected */
83 uint32_t nodeid
; /* So we know who we are in the list */
84 struct rw_semaphore sock_sem
; /* Stop connect races */
85 struct list_head read_list
; /* On this list when ready for reading */
86 struct list_head write_list
; /* On this list when ready for writing */
87 struct list_head state_list
; /* On this list when ready to connect */
88 unsigned long flags
; /* bit 1,2 = We are on the read/write lists */
89 #define CF_READ_PENDING 1
90 #define CF_WRITE_PENDING 2
91 #define CF_CONNECT_PENDING 3
92 #define CF_IS_OTHERCON 4
93 struct list_head writequeue
; /* List of outgoing writequeue_entries */
94 struct list_head listenlist
; /* List of allocated listening sockets */
95 spinlock_t writequeue_lock
;
96 int (*rx_action
) (struct connection
*); /* What to do when active */
100 atomic_t waiting_requests
;
101 #define MAX_CONNECT_RETRIES 3
102 struct connection
*othercon
;
104 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
106 /* An entry waiting to be sent */
107 struct writequeue_entry
{
108 struct list_head list
;
114 struct connection
*con
;
117 static struct sockaddr_storage dlm_local_addr
;
120 static struct task_struct
*recv_task
;
121 static struct task_struct
*send_task
;
123 static wait_queue_t lowcomms_send_waitq_head
;
124 static wait_queue_head_t lowcomms_send_waitq
;
125 static wait_queue_t lowcomms_recv_waitq_head
;
126 static wait_queue_head_t lowcomms_recv_waitq
;
128 /* An array of pointers to connections, indexed by NODEID */
129 static struct connection
**connections
;
130 static struct semaphore connections_lock
;
131 static kmem_cache_t
*con_cache
;
132 static int conn_array_size
;
133 static atomic_t accepting
;
135 /* List of sockets that have reads pending */
136 static struct list_head read_sockets
;
137 static spinlock_t read_sockets_lock
;
139 /* List of sockets which have writes pending */
140 static struct list_head write_sockets
;
141 static spinlock_t write_sockets_lock
;
143 /* List of sockets which have connects pending */
144 static struct list_head state_sockets
;
145 static spinlock_t state_sockets_lock
;
147 static struct connection
*nodeid2con(int nodeid
, gfp_t allocation
)
149 struct connection
*con
= NULL
;
151 down(&connections_lock
);
152 if (nodeid
>= conn_array_size
) {
153 int new_size
= nodeid
+ NODE_INCREMENT
;
154 struct connection
**new_conns
;
156 new_conns
= kmalloc(sizeof(struct connection
*) *
157 new_size
, allocation
);
161 memset(new_conns
, 0, sizeof(struct connection
*) * new_size
);
162 memcpy(new_conns
, connections
, sizeof(struct connection
*) * conn_array_size
);
163 conn_array_size
= new_size
;
165 connections
= new_conns
;
169 con
= connections
[nodeid
];
170 if (con
== NULL
&& allocation
) {
171 con
= kmem_cache_alloc(con_cache
, allocation
);
175 memset(con
, 0, sizeof(*con
));
176 con
->nodeid
= nodeid
;
177 init_rwsem(&con
->sock_sem
);
178 INIT_LIST_HEAD(&con
->writequeue
);
179 spin_lock_init(&con
->writequeue_lock
);
181 connections
[nodeid
] = con
;
185 up(&connections_lock
);
189 /* Data available on socket or listen socket received a connect */
190 static void lowcomms_data_ready(struct sock
*sk
, int count_unused
)
192 struct connection
*con
= sock2con(sk
);
194 atomic_inc(&con
->waiting_requests
);
195 if (test_and_set_bit(CF_READ_PENDING
, &con
->flags
))
198 spin_lock_bh(&read_sockets_lock
);
199 list_add_tail(&con
->read_list
, &read_sockets
);
200 spin_unlock_bh(&read_sockets_lock
);
202 wake_up_interruptible(&lowcomms_recv_waitq
);
205 static void lowcomms_write_space(struct sock
*sk
)
207 struct connection
*con
= sock2con(sk
);
209 if (test_and_set_bit(CF_WRITE_PENDING
, &con
->flags
))
212 spin_lock_bh(&write_sockets_lock
);
213 list_add_tail(&con
->write_list
, &write_sockets
);
214 spin_unlock_bh(&write_sockets_lock
);
216 wake_up_interruptible(&lowcomms_send_waitq
);
219 static inline void lowcomms_connect_sock(struct connection
*con
)
221 if (test_and_set_bit(CF_CONNECT_PENDING
, &con
->flags
))
223 if (!atomic_read(&accepting
))
226 spin_lock_bh(&state_sockets_lock
);
227 list_add_tail(&con
->state_list
, &state_sockets
);
228 spin_unlock_bh(&state_sockets_lock
);
230 wake_up_interruptible(&lowcomms_send_waitq
);
233 static void lowcomms_state_change(struct sock
*sk
)
235 /* struct connection *con = sock2con(sk); */
237 switch (sk
->sk_state
) {
238 case TCP_ESTABLISHED
:
239 lowcomms_write_space(sk
);
249 /* FIXME: I think this causes more trouble than it solves.
250 lowcomms wil reconnect anyway when there is something to
251 send. This just attempts reconnection if a node goes down!
253 /* lowcomms_connect_sock(con); */
257 printk("dlm: lowcomms_state_change: state=%d\n", sk
->sk_state
);
262 /* Make a socket active */
263 static int add_sock(struct socket
*sock
, struct connection
*con
)
267 /* Install a data_ready callback */
268 con
->sock
->sk
->sk_data_ready
= lowcomms_data_ready
;
269 con
->sock
->sk
->sk_write_space
= lowcomms_write_space
;
270 con
->sock
->sk
->sk_state_change
= lowcomms_state_change
;
275 /* Add the port number to an IP6 or 4 sockaddr and return the address
277 static void make_sockaddr(struct sockaddr_storage
*saddr
, uint16_t port
,
280 saddr
->ss_family
= dlm_local_addr
.ss_family
;
281 if (saddr
->ss_family
== AF_INET
) {
282 struct sockaddr_in
*in4_addr
= (struct sockaddr_in
*)saddr
;
283 in4_addr
->sin_port
= cpu_to_be16(port
);
284 *addr_len
= sizeof(struct sockaddr_in
);
287 struct sockaddr_in6
*in6_addr
= (struct sockaddr_in6
*)saddr
;
288 in6_addr
->sin6_port
= cpu_to_be16(port
);
289 *addr_len
= sizeof(struct sockaddr_in6
);
293 /* Close a remote connection and tidy up */
294 static void close_connection(struct connection
*con
, int and_other
)
296 down_write(&con
->sock_sem
);
299 sock_release(con
->sock
);
302 if (con
->othercon
&& and_other
) {
303 /* Argh! recursion in kernel code!
304 Actually, this isn't a list so it
305 will only re-enter once.
307 close_connection(con
->othercon
, FALSE
);
310 __free_page(con
->rx_page
);
314 up_write(&con
->sock_sem
);
317 /* Data received from remote end */
318 static int receive_from_sock(struct connection
*con
)
326 int call_again_soon
= 0;
328 down_read(&con
->sock_sem
);
330 if (con
->sock
== NULL
)
332 if (con
->rx_page
== NULL
) {
334 * This doesn't need to be atomic, but I think it should
335 * improve performance if it is.
337 con
->rx_page
= alloc_page(GFP_ATOMIC
);
338 if (con
->rx_page
== NULL
)
340 CBUF_INIT(&con
->cb
, PAGE_CACHE_SIZE
);
343 msg
.msg_control
= NULL
;
344 msg
.msg_controllen
= 0;
352 * iov[0] is the bit of the circular buffer between the current end
353 * point (cb.base + cb.len) and the end of the buffer.
355 iov
[0].iov_len
= con
->cb
.base
- CBUF_DATA(&con
->cb
);
356 iov
[0].iov_base
= page_address(con
->rx_page
) + CBUF_DATA(&con
->cb
);
360 * iov[1] is the bit of the circular buffer between the start of the
361 * buffer and the start of the currently used section (cb.base)
363 if (CBUF_DATA(&con
->cb
) >= con
->cb
.base
) {
364 iov
[0].iov_len
= PAGE_CACHE_SIZE
- CBUF_DATA(&con
->cb
);
365 iov
[1].iov_len
= con
->cb
.base
;
366 iov
[1].iov_base
= page_address(con
->rx_page
);
369 len
= iov
[0].iov_len
+ iov
[1].iov_len
;
373 r
= ret
= sock_recvmsg(con
->sock
, &msg
, len
,
374 MSG_DONTWAIT
| MSG_NOSIGNAL
);
381 CBUF_ADD(&con
->cb
, ret
);
382 ret
= dlm_process_incoming_buffer(con
->nodeid
,
383 page_address(con
->rx_page
),
384 con
->cb
.base
, con
->cb
.len
,
386 if (ret
== -EBADMSG
) {
387 printk(KERN_INFO
"dlm: lowcomms: addr=%p, base=%u, len=%u, "
388 "iov_len=%u, iov_base[0]=%p, read=%d\n",
389 page_address(con
->rx_page
), con
->cb
.base
, con
->cb
.len
,
390 len
, iov
[0].iov_base
, r
);
394 CBUF_EAT(&con
->cb
, ret
);
396 if (CBUF_EMPTY(&con
->cb
) && !call_again_soon
) {
397 __free_page(con
->rx_page
);
404 up_read(&con
->sock_sem
);
409 lowcomms_data_ready(con
->sock
->sk
, 0);
410 up_read(&con
->sock_sem
);
416 up_read(&con
->sock_sem
);
417 if (ret
!= -EAGAIN
&& !test_bit(CF_IS_OTHERCON
, &con
->flags
)) {
418 close_connection(con
, FALSE
);
419 /* Reconnect when there is something to send */
426 /* Listening socket is busy, accept a connection */
427 static int accept_from_sock(struct connection
*con
)
430 struct sockaddr_storage peeraddr
;
431 struct socket
*newsock
;
434 struct connection
*newcon
;
436 memset(&peeraddr
, 0, sizeof(peeraddr
));
437 result
= sock_create_kern(dlm_local_addr
.ss_family
, SOCK_STREAM
, IPPROTO_TCP
, &newsock
);
441 down_read(&con
->sock_sem
);
444 if (con
->sock
== NULL
)
447 newsock
->type
= con
->sock
->type
;
448 newsock
->ops
= con
->sock
->ops
;
450 result
= con
->sock
->ops
->accept(con
->sock
, newsock
, O_NONBLOCK
);
454 /* Get the connected socket's peer */
455 memset(&peeraddr
, 0, sizeof(peeraddr
));
456 if (newsock
->ops
->getname(newsock
, (struct sockaddr
*)&peeraddr
,
458 result
= -ECONNABORTED
;
462 /* Get the new node's NODEID */
463 make_sockaddr(&peeraddr
, 0, &len
);
464 if (dlm_addr_to_nodeid(&peeraddr
, &nodeid
)) {
465 printk("dlm: connect from non cluster node\n");
466 sock_release(newsock
);
467 up_read(&con
->sock_sem
);
471 log_print("got connection from %d", nodeid
);
473 /* Check to see if we already have a connection to this node. This
474 * could happen if the two nodes initiate a connection at roughly
475 * the same time and the connections cross on the wire.
477 * In this case we store the incoming one in "othercon"
479 newcon
= nodeid2con(nodeid
, GFP_KERNEL
);
484 down_write(&newcon
->sock_sem
);
486 struct connection
*othercon
= newcon
->othercon
;
489 othercon
= kmem_cache_alloc(con_cache
, GFP_KERNEL
);
491 printk("dlm: failed to allocate incoming socket\n");
492 up_write(&newcon
->sock_sem
);
496 memset(othercon
, 0, sizeof(*othercon
));
497 othercon
->nodeid
= nodeid
;
498 othercon
->rx_action
= receive_from_sock
;
499 init_rwsem(&othercon
->sock_sem
);
500 set_bit(CF_IS_OTHERCON
, &othercon
->flags
);
501 newcon
->othercon
= othercon
;
503 othercon
->sock
= newsock
;
504 newsock
->sk
->sk_user_data
= othercon
;
505 add_sock(newsock
, othercon
);
508 newsock
->sk
->sk_user_data
= newcon
;
509 newcon
->rx_action
= receive_from_sock
;
510 add_sock(newsock
, newcon
);
514 up_write(&newcon
->sock_sem
);
517 * Add it to the active queue in case we got data
518 * beween processing the accept adding the socket
519 * to the read_sockets list
521 lowcomms_data_ready(newsock
->sk
, 0);
522 up_read(&con
->sock_sem
);
527 up_read(&con
->sock_sem
);
528 sock_release(newsock
);
530 if (result
!= -EAGAIN
)
531 printk("dlm: error accepting connection from node: %d\n", result
);
535 /* Connect a new socket to its peer */
536 static int connect_to_sock(struct connection
*con
)
538 int result
= -EHOSTUNREACH
;
539 struct sockaddr_storage saddr
;
543 if (con
->nodeid
== 0) {
544 log_print("attempt to connect sock 0 foiled");
548 down_write(&con
->sock_sem
);
549 if (con
->retries
++ > MAX_CONNECT_RETRIES
)
552 /* Some odd races can cause double-connects, ignore them */
558 /* Create a socket to communicate with */
559 result
= sock_create_kern(dlm_local_addr
.ss_family
, SOCK_STREAM
, IPPROTO_TCP
, &sock
);
563 memset(&saddr
, 0, sizeof(saddr
));
564 if (dlm_nodeid_to_addr(con
->nodeid
, &saddr
))
567 sock
->sk
->sk_user_data
= con
;
568 con
->rx_action
= receive_from_sock
;
570 make_sockaddr(&saddr
, dlm_config
.tcp_port
, &addr_len
);
574 log_print("connecting to %d", con
->nodeid
);
576 sock
->ops
->connect(sock
, (struct sockaddr
*)&saddr
, addr_len
,
578 if (result
== -EINPROGRESS
)
584 up_write(&con
->sock_sem
);
586 * Returning an error here means we've given up trying to connect to
587 * a remote node, otherwise we return 0 and reschedule the connetion
594 sock_release(con
->sock
);
598 * Some errors are fatal and this list might need adjusting. For other
599 * errors we try again until the max number of retries is reached.
601 if (result
!= -EHOSTUNREACH
&& result
!= -ENETUNREACH
&&
602 result
!= -ENETDOWN
&& result
!= EINVAL
603 && result
!= -EPROTONOSUPPORT
) {
604 lowcomms_connect_sock(con
);
610 static struct socket
*create_listen_sock(struct connection
*con
, struct sockaddr_storage
*saddr
)
612 struct socket
*sock
= NULL
;
618 if (dlm_local_addr
.ss_family
== AF_INET
)
619 addr_len
= sizeof(struct sockaddr_in
);
621 addr_len
= sizeof(struct sockaddr_in6
);
623 /* Create a socket to communicate with */
624 result
= sock_create_kern(dlm_local_addr
.ss_family
, SOCK_STREAM
, IPPROTO_TCP
, &sock
);
626 printk("dlm: Can't create listening comms socket\n");
632 result
= sock_setsockopt(sock
, SOL_SOCKET
, SO_REUSEADDR
, (char *)&one
, sizeof(one
));
635 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result
);
637 sock
->sk
->sk_user_data
= con
;
638 con
->rx_action
= accept_from_sock
;
641 /* Bind to our port */
642 make_sockaddr(saddr
, dlm_config
.tcp_port
, &addr_len
);
643 result
= sock
->ops
->bind(sock
, (struct sockaddr
*) saddr
, addr_len
);
645 printk("dlm: Can't bind to port %d\n", dlm_config
.tcp_port
);
655 result
= sock_setsockopt(sock
, SOL_SOCKET
, SO_KEEPALIVE
, (char *)&one
, sizeof(one
));
658 printk("dlm: Set keepalive failed: %d\n", result
);
661 result
= sock
->ops
->listen(sock
, 5);
663 printk("dlm: Can't listen on port %d\n", dlm_config
.tcp_port
);
674 /* Listen on all interfaces */
675 static int listen_for_all(void)
677 struct socket
*sock
= NULL
;
678 struct connection
*con
= nodeid2con(0, GFP_KERNEL
);
679 int result
= -EINVAL
;
681 /* We don't support multi-homed hosts */
682 memset(con
, 0, sizeof(*con
));
683 init_rwsem(&con
->sock_sem
);
684 spin_lock_init(&con
->writequeue_lock
);
685 INIT_LIST_HEAD(&con
->writequeue
);
686 set_bit(CF_IS_OTHERCON
, &con
->flags
);
688 sock
= create_listen_sock(con
, &dlm_local_addr
);
694 result
= -EADDRINUSE
;
702 static struct writequeue_entry
*new_writequeue_entry(struct connection
*con
,
705 struct writequeue_entry
*entry
;
707 entry
= kmalloc(sizeof(struct writequeue_entry
), allocation
);
711 entry
->page
= alloc_page(allocation
);
726 void *dlm_lowcomms_get_buffer(int nodeid
, int len
,
727 gfp_t allocation
, char **ppc
)
729 struct connection
*con
;
730 struct writequeue_entry
*e
;
734 if (!atomic_read(&accepting
))
737 con
= nodeid2con(nodeid
, allocation
);
741 spin_lock(&con
->writequeue_lock
);
742 e
= list_entry(con
->writequeue
.prev
, struct writequeue_entry
, list
);
743 if (((struct list_head
*) e
== &con
->writequeue
) ||
744 (PAGE_CACHE_SIZE
- e
->end
< len
)) {
751 spin_unlock(&con
->writequeue_lock
);
757 *ppc
= page_address(e
->page
) + offset
;
761 e
= new_writequeue_entry(con
, allocation
);
763 spin_lock(&con
->writequeue_lock
);
767 list_add_tail(&e
->list
, &con
->writequeue
);
768 spin_unlock(&con
->writequeue_lock
);
774 void dlm_lowcomms_commit_buffer(void *mh
)
776 struct writequeue_entry
*e
= (struct writequeue_entry
*)mh
;
777 struct connection
*con
= e
->con
;
780 if (!atomic_read(&accepting
))
783 spin_lock(&con
->writequeue_lock
);
787 e
->len
= e
->end
- e
->offset
;
789 spin_unlock(&con
->writequeue_lock
);
791 if (test_and_set_bit(CF_WRITE_PENDING
, &con
->flags
) == 0) {
792 spin_lock_bh(&write_sockets_lock
);
793 list_add_tail(&con
->write_list
, &write_sockets
);
794 spin_unlock_bh(&write_sockets_lock
);
796 wake_up_interruptible(&lowcomms_send_waitq
);
801 spin_unlock(&con
->writequeue_lock
);
805 static void free_entry(struct writequeue_entry
*e
)
807 __free_page(e
->page
);
812 static int send_to_sock(struct connection
*con
)
815 ssize_t(*sendpage
) (struct socket
*, struct page
*, int, size_t, int);
816 const int msg_flags
= MSG_DONTWAIT
| MSG_NOSIGNAL
;
817 struct writequeue_entry
*e
;
820 down_read(&con
->sock_sem
);
821 if (con
->sock
== NULL
)
824 sendpage
= con
->sock
->ops
->sendpage
;
826 spin_lock(&con
->writequeue_lock
);
828 e
= list_entry(con
->writequeue
.next
, struct writequeue_entry
,
830 if ((struct list_head
*) e
== &con
->writequeue
)
835 BUG_ON(len
== 0 && e
->users
== 0);
836 spin_unlock(&con
->writequeue_lock
);
840 ret
= sendpage(con
->sock
, e
->page
, offset
, len
,
842 if (ret
== -EAGAIN
|| ret
== 0)
848 /* Don't starve people filling buffers */
852 spin_lock(&con
->writequeue_lock
);
856 if (e
->len
== 0 && e
->users
== 0) {
862 spin_unlock(&con
->writequeue_lock
);
864 up_read(&con
->sock_sem
);
868 up_read(&con
->sock_sem
);
869 close_connection(con
, FALSE
);
870 lowcomms_connect_sock(con
);
874 up_read(&con
->sock_sem
);
875 lowcomms_connect_sock(con
);
879 static void clean_one_writequeue(struct connection
*con
)
881 struct list_head
*list
;
882 struct list_head
*temp
;
884 spin_lock(&con
->writequeue_lock
);
885 list_for_each_safe(list
, temp
, &con
->writequeue
) {
886 struct writequeue_entry
*e
=
887 list_entry(list
, struct writequeue_entry
, list
);
891 spin_unlock(&con
->writequeue_lock
);
894 /* Called from recovery when it knows that a node has
896 int dlm_lowcomms_close(int nodeid
)
898 struct connection
*con
;
903 log_print("closing connection to node %d", nodeid
);
904 con
= nodeid2con(nodeid
, 0);
906 clean_one_writequeue(con
);
907 close_connection(con
, TRUE
);
908 atomic_set(&con
->waiting_requests
, 0);
916 /* API send message call, may queue the request */
917 /* N.B. This is the old interface - use the new one for new calls */
918 int lowcomms_send_message(int nodeid
, char *buf
, int len
, gfp_t allocation
)
920 struct writequeue_entry
*e
;
923 e
= dlm_lowcomms_get_buffer(nodeid
, len
, allocation
, &b
);
926 dlm_lowcomms_commit_buffer(e
);
932 /* Look for activity on active sockets */
933 static void process_sockets(void)
935 struct list_head
*list
;
936 struct list_head
*temp
;
939 spin_lock_bh(&read_sockets_lock
);
940 list_for_each_safe(list
, temp
, &read_sockets
) {
942 struct connection
*con
=
943 list_entry(list
, struct connection
, read_list
);
944 list_del(&con
->read_list
);
945 clear_bit(CF_READ_PENDING
, &con
->flags
);
947 spin_unlock_bh(&read_sockets_lock
);
949 /* This can reach zero if we are processing requests
952 if (atomic_read(&con
->waiting_requests
) == 0) {
953 spin_lock_bh(&read_sockets_lock
);
960 /* Don't starve out everyone else */
961 if (++count
>= MAX_RX_MSG_COUNT
) {
966 } while (!atomic_dec_and_test(&con
->waiting_requests
) &&
967 !kthread_should_stop());
969 spin_lock_bh(&read_sockets_lock
);
971 spin_unlock_bh(&read_sockets_lock
);
974 /* Try to send any messages that are pending
976 static void process_output_queue(void)
978 struct list_head
*list
;
979 struct list_head
*temp
;
982 spin_lock_bh(&write_sockets_lock
);
983 list_for_each_safe(list
, temp
, &write_sockets
) {
984 struct connection
*con
=
985 list_entry(list
, struct connection
, write_list
);
986 clear_bit(CF_WRITE_PENDING
, &con
->flags
);
987 list_del(&con
->write_list
);
989 spin_unlock_bh(&write_sockets_lock
);
991 ret
= send_to_sock(con
);
994 spin_lock_bh(&write_sockets_lock
);
996 spin_unlock_bh(&write_sockets_lock
);
999 static void process_state_queue(void)
1001 struct list_head
*list
;
1002 struct list_head
*temp
;
1005 spin_lock_bh(&state_sockets_lock
);
1006 list_for_each_safe(list
, temp
, &state_sockets
) {
1007 struct connection
*con
=
1008 list_entry(list
, struct connection
, state_list
);
1009 list_del(&con
->state_list
);
1010 clear_bit(CF_CONNECT_PENDING
, &con
->flags
);
1011 spin_unlock_bh(&state_sockets_lock
);
1013 ret
= connect_to_sock(con
);
1016 spin_lock_bh(&state_sockets_lock
);
1018 spin_unlock_bh(&state_sockets_lock
);
1022 /* Discard all entries on the write queues */
1023 static void clean_writequeues(void)
1027 for (nodeid
= 1; nodeid
< conn_array_size
; nodeid
++) {
1028 struct connection
*con
= nodeid2con(nodeid
, 0);
1031 clean_one_writequeue(con
);
1035 static int read_list_empty(void)
1039 spin_lock_bh(&read_sockets_lock
);
1040 status
= list_empty(&read_sockets
);
1041 spin_unlock_bh(&read_sockets_lock
);
1046 /* DLM Transport comms receive daemon */
1047 static int dlm_recvd(void *data
)
1049 init_waitqueue_head(&lowcomms_recv_waitq
);
1050 init_waitqueue_entry(&lowcomms_recv_waitq_head
, current
);
1051 add_wait_queue(&lowcomms_recv_waitq
, &lowcomms_recv_waitq_head
);
1053 while (!kthread_should_stop()) {
1054 set_current_state(TASK_INTERRUPTIBLE
);
1055 if (read_list_empty())
1057 set_current_state(TASK_RUNNING
);
1065 static int write_and_state_lists_empty(void)
1069 spin_lock_bh(&write_sockets_lock
);
1070 status
= list_empty(&write_sockets
);
1071 spin_unlock_bh(&write_sockets_lock
);
1073 spin_lock_bh(&state_sockets_lock
);
1074 if (list_empty(&state_sockets
) == 0)
1076 spin_unlock_bh(&state_sockets_lock
);
1081 /* DLM Transport send daemon */
1082 static int dlm_sendd(void *data
)
1084 init_waitqueue_head(&lowcomms_send_waitq
);
1085 init_waitqueue_entry(&lowcomms_send_waitq_head
, current
);
1086 add_wait_queue(&lowcomms_send_waitq
, &lowcomms_send_waitq_head
);
1088 while (!kthread_should_stop()) {
1089 set_current_state(TASK_INTERRUPTIBLE
);
1090 if (write_and_state_lists_empty())
1092 set_current_state(TASK_RUNNING
);
1094 process_state_queue();
1095 process_output_queue();
1101 static void daemons_stop(void)
1103 kthread_stop(recv_task
);
1104 kthread_stop(send_task
);
1107 static int daemons_start(void)
1109 struct task_struct
*p
;
1112 p
= kthread_run(dlm_recvd
, NULL
, "dlm_recvd");
1115 log_print("can't start dlm_recvd %d", error
);
1120 p
= kthread_run(dlm_sendd
, NULL
, "dlm_sendd");
1123 log_print("can't start dlm_sendd %d", error
);
1124 kthread_stop(recv_task
);
1133 * Return the largest buffer size we can cope with.
1135 int lowcomms_max_buffer_size(void)
1137 return PAGE_CACHE_SIZE
;
1140 void dlm_lowcomms_stop(void)
1144 atomic_set(&accepting
, 0);
1146 /* Set all the activity flags to prevent any
1149 for (i
= 0; i
< conn_array_size
; i
++) {
1151 connections
[i
]->flags
|= 0x7;
1154 clean_writequeues();
1156 for (i
= 0; i
< conn_array_size
; i
++) {
1157 if (connections
[i
]) {
1158 close_connection(connections
[i
], TRUE
);
1159 if (connections
[i
]->othercon
)
1160 kmem_cache_free(con_cache
, connections
[i
]->othercon
);
1161 kmem_cache_free(con_cache
, connections
[i
]);
1168 kmem_cache_destroy(con_cache
);
1171 /* This is quite likely to sleep... */
1172 int dlm_lowcomms_start(void)
1179 * Temporarily initialise the waitq head so that lowcomms_send_message
1180 * doesn't crash if it gets called before the thread is fully
1183 init_waitqueue_head(&lowcomms_send_waitq
);
1186 connections
= kmalloc(sizeof(struct connection
*) *
1187 NODE_INCREMENT
, GFP_KERNEL
);
1191 memset(connections
, 0,
1192 sizeof(struct connection
*) * NODE_INCREMENT
);
1194 conn_array_size
= NODE_INCREMENT
;
1196 if (dlm_our_addr(&dlm_local_addr
, 0)) {
1197 log_print("no local IP address has been set");
1198 goto fail_free_conn
;
1200 if (!dlm_our_addr(&dlm_local_addr
, 1)) {
1201 log_print("This dlm comms module does not support multi-homed clustering");
1202 goto fail_free_conn
;
1205 con_cache
= kmem_cache_create("dlm_conn", sizeof(struct connection
),
1206 __alignof__(struct connection
), 0, NULL
, NULL
);
1208 goto fail_free_conn
;
1211 /* Start listening */
1212 error
= listen_for_all();
1216 error
= daemons_start();
1220 atomic_set(&accepting
, 1);
1225 close_connection(connections
[0], 0);
1226 kmem_cache_free(con_cache
, connections
[0]);
1227 kmem_cache_destroy(con_cache
);
1236 int dlm_lowcomms_init(void)
1238 INIT_LIST_HEAD(&read_sockets
);
1239 INIT_LIST_HEAD(&write_sockets
);
1240 INIT_LIST_HEAD(&state_sockets
);
1242 spin_lock_init(&read_sockets_lock
);
1243 spin_lock_init(&write_sockets_lock
);
1244 spin_lock_init(&state_sockets_lock
);
1245 init_MUTEX(&connections_lock
);
1250 void dlm_lowcomms_exit(void)
1255 * Overrides for Emacs so that we follow Linus's tabbing style.
1256 * Emacs will notice this stuff at the end of the file and automatically
1257 * adjust the settings for this buffer only. This must remain at the end
1259 * ---------------------------------------------------------------------------
1261 * c-file-style: "linux"