[DLM] Add support for tcp communications
[linux-2.6.git] / fs / dlm / lowcomms-tcp.c
blob7289e59b4bd37d2b153f8856e34a3ef65949883d
1 /******************************************************************************
2 *******************************************************************************
3 **
4 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
5 ** Copyright (C) 2004-2006 Red Hat, Inc. All rights reserved.
6 **
7 ** This copyrighted material is made available to anyone wishing to use,
8 ** modify, copy, or redistribute it subject to the terms and conditions
9 ** of the GNU General Public License v.2.
11 *******************************************************************************
12 ******************************************************************************/
15 * lowcomms.c
17 * This is the "low-level" comms layer.
19 * It is responsible for sending/receiving messages
20 * from other nodes in the cluster.
22 * Cluster nodes are referred to by their nodeids. nodeids are
23 * simply 32 bit numbers to the locking module - if they need to
24 * be expanded for the cluster infrastructure then that is it's
25 * responsibility. It is this layer's
26 * responsibility to resolve these into IP address or
27 * whatever it needs for inter-node communication.
29 * The comms level is two kernel threads that deal mainly with
30 * the receiving of messages from other nodes and passing them
31 * up to the mid-level comms layer (which understands the
32 * message format) for execution by the locking core, and
33 * a send thread which does all the setting up of connections
34 * to remote nodes and the sending of data. Threads are not allowed
35 * to send their own data because it may cause them to wait in times
36 * of high load. Also, this way, the sending thread can collect together
37 * messages bound for one node and send them in one block.
39 * I don't see any problem with the recv thread executing the locking
40 * code on behalf of remote processes as the locking code is
41 * short, efficient and never waits.
46 #include <asm/ioctls.h>
47 #include <net/sock.h>
48 #include <net/tcp.h>
49 #include <linux/pagemap.h>
51 #include "dlm_internal.h"
52 #include "lowcomms.h"
53 #include "midcomms.h"
54 #include "config.h"
56 struct cbuf {
57 unsigned base;
58 unsigned len;
59 unsigned mask;
62 #ifndef FALSE
63 #define FALSE 0
64 #define TRUE 1
65 #endif
66 #define NODE_INCREMENT 32
68 #define CBUF_INIT(cb, size) do { (cb)->base = (cb)->len = 0; (cb)->mask = ((size)-1); } while(0)
69 #define CBUF_ADD(cb, n) do { (cb)->len += n; } while(0)
70 #define CBUF_EMPTY(cb) ((cb)->len == 0)
71 #define CBUF_MAY_ADD(cb, n) (((cb)->len + (n)) < ((cb)->mask + 1))
72 #define CBUF_EAT(cb, n) do { (cb)->len -= (n); \
73 (cb)->base += (n); (cb)->base &= (cb)->mask; } while(0)
74 #define CBUF_DATA(cb) (((cb)->base + (cb)->len) & (cb)->mask)
76 /* Maximum number of incoming messages to process before
77 doing a schedule()
79 #define MAX_RX_MSG_COUNT 25
81 struct connection {
82 struct socket *sock; /* NULL if not connected */
83 uint32_t nodeid; /* So we know who we are in the list */
84 struct rw_semaphore sock_sem; /* Stop connect races */
85 struct list_head read_list; /* On this list when ready for reading */
86 struct list_head write_list; /* On this list when ready for writing */
87 struct list_head state_list; /* On this list when ready to connect */
88 unsigned long flags; /* bit 1,2 = We are on the read/write lists */
89 #define CF_READ_PENDING 1
90 #define CF_WRITE_PENDING 2
91 #define CF_CONNECT_PENDING 3
92 #define CF_IS_OTHERCON 4
93 struct list_head writequeue; /* List of outgoing writequeue_entries */
94 struct list_head listenlist; /* List of allocated listening sockets */
95 spinlock_t writequeue_lock;
96 int (*rx_action) (struct connection *); /* What to do when active */
97 struct page *rx_page;
98 struct cbuf cb;
99 int retries;
100 atomic_t waiting_requests;
101 #define MAX_CONNECT_RETRIES 3
102 struct connection *othercon;
104 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
106 /* An entry waiting to be sent */
107 struct writequeue_entry {
108 struct list_head list;
109 struct page *page;
110 int offset;
111 int len;
112 int end;
113 int users;
114 struct connection *con;
117 static struct sockaddr_storage dlm_local_addr;
119 /* Manage daemons */
120 static struct task_struct *recv_task;
121 static struct task_struct *send_task;
123 static wait_queue_t lowcomms_send_waitq_head;
124 static wait_queue_head_t lowcomms_send_waitq;
125 static wait_queue_t lowcomms_recv_waitq_head;
126 static wait_queue_head_t lowcomms_recv_waitq;
128 /* An array of pointers to connections, indexed by NODEID */
129 static struct connection **connections;
130 static struct semaphore connections_lock;
131 static kmem_cache_t *con_cache;
132 static int conn_array_size;
133 static atomic_t accepting;
135 /* List of sockets that have reads pending */
136 static struct list_head read_sockets;
137 static spinlock_t read_sockets_lock;
139 /* List of sockets which have writes pending */
140 static struct list_head write_sockets;
141 static spinlock_t write_sockets_lock;
143 /* List of sockets which have connects pending */
144 static struct list_head state_sockets;
145 static spinlock_t state_sockets_lock;
147 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
149 struct connection *con = NULL;
151 down(&connections_lock);
152 if (nodeid >= conn_array_size) {
153 int new_size = nodeid + NODE_INCREMENT;
154 struct connection **new_conns;
156 new_conns = kmalloc(sizeof(struct connection *) *
157 new_size, allocation);
158 if (!new_conns)
159 goto finish;
161 memset(new_conns, 0, sizeof(struct connection *) * new_size);
162 memcpy(new_conns, connections, sizeof(struct connection *) * conn_array_size);
163 conn_array_size = new_size;
164 kfree(connections);
165 connections = new_conns;
169 con = connections[nodeid];
170 if (con == NULL && allocation) {
171 con = kmem_cache_alloc(con_cache, allocation);
172 if (!con)
173 goto finish;
175 memset(con, 0, sizeof(*con));
176 con->nodeid = nodeid;
177 init_rwsem(&con->sock_sem);
178 INIT_LIST_HEAD(&con->writequeue);
179 spin_lock_init(&con->writequeue_lock);
181 connections[nodeid] = con;
184 finish:
185 up(&connections_lock);
186 return con;
189 /* Data available on socket or listen socket received a connect */
190 static void lowcomms_data_ready(struct sock *sk, int count_unused)
192 struct connection *con = sock2con(sk);
194 atomic_inc(&con->waiting_requests);
195 if (test_and_set_bit(CF_READ_PENDING, &con->flags))
196 return;
198 spin_lock_bh(&read_sockets_lock);
199 list_add_tail(&con->read_list, &read_sockets);
200 spin_unlock_bh(&read_sockets_lock);
202 wake_up_interruptible(&lowcomms_recv_waitq);
205 static void lowcomms_write_space(struct sock *sk)
207 struct connection *con = sock2con(sk);
209 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
210 return;
212 spin_lock_bh(&write_sockets_lock);
213 list_add_tail(&con->write_list, &write_sockets);
214 spin_unlock_bh(&write_sockets_lock);
216 wake_up_interruptible(&lowcomms_send_waitq);
219 static inline void lowcomms_connect_sock(struct connection *con)
221 if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
222 return;
223 if (!atomic_read(&accepting))
224 return;
226 spin_lock_bh(&state_sockets_lock);
227 list_add_tail(&con->state_list, &state_sockets);
228 spin_unlock_bh(&state_sockets_lock);
230 wake_up_interruptible(&lowcomms_send_waitq);
233 static void lowcomms_state_change(struct sock *sk)
235 /* struct connection *con = sock2con(sk); */
237 switch (sk->sk_state) {
238 case TCP_ESTABLISHED:
239 lowcomms_write_space(sk);
240 break;
242 case TCP_FIN_WAIT1:
243 case TCP_FIN_WAIT2:
244 case TCP_TIME_WAIT:
245 case TCP_CLOSE:
246 case TCP_CLOSE_WAIT:
247 case TCP_LAST_ACK:
248 case TCP_CLOSING:
249 /* FIXME: I think this causes more trouble than it solves.
250 lowcomms wil reconnect anyway when there is something to
251 send. This just attempts reconnection if a node goes down!
253 /* lowcomms_connect_sock(con); */
254 break;
256 default:
257 printk("dlm: lowcomms_state_change: state=%d\n", sk->sk_state);
258 break;
262 /* Make a socket active */
263 static int add_sock(struct socket *sock, struct connection *con)
265 con->sock = sock;
267 /* Install a data_ready callback */
268 con->sock->sk->sk_data_ready = lowcomms_data_ready;
269 con->sock->sk->sk_write_space = lowcomms_write_space;
270 con->sock->sk->sk_state_change = lowcomms_state_change;
272 return 0;
275 /* Add the port number to an IP6 or 4 sockaddr and return the address
276 length */
277 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
278 int *addr_len)
280 saddr->ss_family = dlm_local_addr.ss_family;
281 if (saddr->ss_family == AF_INET) {
282 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
283 in4_addr->sin_port = cpu_to_be16(port);
284 *addr_len = sizeof(struct sockaddr_in);
286 else {
287 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
288 in6_addr->sin6_port = cpu_to_be16(port);
289 *addr_len = sizeof(struct sockaddr_in6);
293 /* Close a remote connection and tidy up */
294 static void close_connection(struct connection *con, int and_other)
296 down_write(&con->sock_sem);
298 if (con->sock) {
299 sock_release(con->sock);
300 con->sock = NULL;
302 if (con->othercon && and_other) {
303 /* Argh! recursion in kernel code!
304 Actually, this isn't a list so it
305 will only re-enter once.
307 close_connection(con->othercon, FALSE);
309 if (con->rx_page) {
310 __free_page(con->rx_page);
311 con->rx_page = NULL;
313 con->retries = 0;
314 up_write(&con->sock_sem);
317 /* Data received from remote end */
318 static int receive_from_sock(struct connection *con)
320 int ret = 0;
321 struct msghdr msg;
322 struct iovec iov[2];
323 mm_segment_t fs;
324 unsigned len;
325 int r;
326 int call_again_soon = 0;
328 down_read(&con->sock_sem);
330 if (con->sock == NULL)
331 goto out;
332 if (con->rx_page == NULL) {
334 * This doesn't need to be atomic, but I think it should
335 * improve performance if it is.
337 con->rx_page = alloc_page(GFP_ATOMIC);
338 if (con->rx_page == NULL)
339 goto out_resched;
340 CBUF_INIT(&con->cb, PAGE_CACHE_SIZE);
343 msg.msg_control = NULL;
344 msg.msg_controllen = 0;
345 msg.msg_iovlen = 1;
346 msg.msg_iov = iov;
347 msg.msg_name = NULL;
348 msg.msg_namelen = 0;
349 msg.msg_flags = 0;
352 * iov[0] is the bit of the circular buffer between the current end
353 * point (cb.base + cb.len) and the end of the buffer.
355 iov[0].iov_len = con->cb.base - CBUF_DATA(&con->cb);
356 iov[0].iov_base = page_address(con->rx_page) + CBUF_DATA(&con->cb);
357 iov[1].iov_len = 0;
360 * iov[1] is the bit of the circular buffer between the start of the
361 * buffer and the start of the currently used section (cb.base)
363 if (CBUF_DATA(&con->cb) >= con->cb.base) {
364 iov[0].iov_len = PAGE_CACHE_SIZE - CBUF_DATA(&con->cb);
365 iov[1].iov_len = con->cb.base;
366 iov[1].iov_base = page_address(con->rx_page);
367 msg.msg_iovlen = 2;
369 len = iov[0].iov_len + iov[1].iov_len;
371 fs = get_fs();
372 set_fs(get_ds());
373 r = ret = sock_recvmsg(con->sock, &msg, len,
374 MSG_DONTWAIT | MSG_NOSIGNAL);
375 set_fs(fs);
377 if (ret <= 0)
378 goto out_close;
379 if (ret == len)
380 call_again_soon = 1;
381 CBUF_ADD(&con->cb, ret);
382 ret = dlm_process_incoming_buffer(con->nodeid,
383 page_address(con->rx_page),
384 con->cb.base, con->cb.len,
385 PAGE_CACHE_SIZE);
386 if (ret == -EBADMSG) {
387 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
388 "iov_len=%u, iov_base[0]=%p, read=%d\n",
389 page_address(con->rx_page), con->cb.base, con->cb.len,
390 len, iov[0].iov_base, r);
392 if (ret < 0)
393 goto out_close;
394 CBUF_EAT(&con->cb, ret);
396 if (CBUF_EMPTY(&con->cb) && !call_again_soon) {
397 __free_page(con->rx_page);
398 con->rx_page = NULL;
401 out:
402 if (call_again_soon)
403 goto out_resched;
404 up_read(&con->sock_sem);
405 ret = 0;
406 goto out_ret;
408 out_resched:
409 lowcomms_data_ready(con->sock->sk, 0);
410 up_read(&con->sock_sem);
411 ret = 0;
412 schedule();
413 goto out_ret;
415 out_close:
416 up_read(&con->sock_sem);
417 if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
418 close_connection(con, FALSE);
419 /* Reconnect when there is something to send */
422 out_ret:
423 return ret;
426 /* Listening socket is busy, accept a connection */
427 static int accept_from_sock(struct connection *con)
429 int result;
430 struct sockaddr_storage peeraddr;
431 struct socket *newsock;
432 int len;
433 int nodeid;
434 struct connection *newcon;
436 memset(&peeraddr, 0, sizeof(peeraddr));
437 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &newsock);
438 if (result < 0)
439 return -ENOMEM;
441 down_read(&con->sock_sem);
443 result = -ENOTCONN;
444 if (con->sock == NULL)
445 goto accept_err;
447 newsock->type = con->sock->type;
448 newsock->ops = con->sock->ops;
450 result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
451 if (result < 0)
452 goto accept_err;
454 /* Get the connected socket's peer */
455 memset(&peeraddr, 0, sizeof(peeraddr));
456 if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
457 &len, 2)) {
458 result = -ECONNABORTED;
459 goto accept_err;
462 /* Get the new node's NODEID */
463 make_sockaddr(&peeraddr, 0, &len);
464 if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
465 printk("dlm: connect from non cluster node\n");
466 sock_release(newsock);
467 up_read(&con->sock_sem);
468 return -1;
471 log_print("got connection from %d", nodeid);
473 /* Check to see if we already have a connection to this node. This
474 * could happen if the two nodes initiate a connection at roughly
475 * the same time and the connections cross on the wire.
476 * TEMPORARY FIX:
477 * In this case we store the incoming one in "othercon"
479 newcon = nodeid2con(nodeid, GFP_KERNEL);
480 if (!newcon) {
481 result = -ENOMEM;
482 goto accept_err;
484 down_write(&newcon->sock_sem);
485 if (newcon->sock) {
486 struct connection *othercon = newcon->othercon;
488 if (!othercon) {
489 othercon = kmem_cache_alloc(con_cache, GFP_KERNEL);
490 if (!othercon) {
491 printk("dlm: failed to allocate incoming socket\n");
492 up_write(&newcon->sock_sem);
493 result = -ENOMEM;
494 goto accept_err;
496 memset(othercon, 0, sizeof(*othercon));
497 othercon->nodeid = nodeid;
498 othercon->rx_action = receive_from_sock;
499 init_rwsem(&othercon->sock_sem);
500 set_bit(CF_IS_OTHERCON, &othercon->flags);
501 newcon->othercon = othercon;
503 othercon->sock = newsock;
504 newsock->sk->sk_user_data = othercon;
505 add_sock(newsock, othercon);
507 else {
508 newsock->sk->sk_user_data = newcon;
509 newcon->rx_action = receive_from_sock;
510 add_sock(newsock, newcon);
514 up_write(&newcon->sock_sem);
517 * Add it to the active queue in case we got data
518 * beween processing the accept adding the socket
519 * to the read_sockets list
521 lowcomms_data_ready(newsock->sk, 0);
522 up_read(&con->sock_sem);
524 return 0;
526 accept_err:
527 up_read(&con->sock_sem);
528 sock_release(newsock);
530 if (result != -EAGAIN)
531 printk("dlm: error accepting connection from node: %d\n", result);
532 return result;
535 /* Connect a new socket to its peer */
536 static int connect_to_sock(struct connection *con)
538 int result = -EHOSTUNREACH;
539 struct sockaddr_storage saddr;
540 int addr_len;
541 struct socket *sock;
543 if (con->nodeid == 0) {
544 log_print("attempt to connect sock 0 foiled");
545 return 0;
548 down_write(&con->sock_sem);
549 if (con->retries++ > MAX_CONNECT_RETRIES)
550 goto out;
552 /* Some odd races can cause double-connects, ignore them */
553 if (con->sock) {
554 result = 0;
555 goto out;
558 /* Create a socket to communicate with */
559 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
560 if (result < 0)
561 goto out_err;
563 memset(&saddr, 0, sizeof(saddr));
564 if (dlm_nodeid_to_addr(con->nodeid, &saddr))
565 goto out_err;
567 sock->sk->sk_user_data = con;
568 con->rx_action = receive_from_sock;
570 make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len);
572 add_sock(sock, con);
574 log_print("connecting to %d", con->nodeid);
575 result =
576 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
577 O_NONBLOCK);
578 if (result == -EINPROGRESS)
579 result = 0;
580 if (result != 0)
581 goto out_err;
583 out:
584 up_write(&con->sock_sem);
586 * Returning an error here means we've given up trying to connect to
587 * a remote node, otherwise we return 0 and reschedule the connetion
588 * attempt
590 return result;
592 out_err:
593 if (con->sock) {
594 sock_release(con->sock);
595 con->sock = NULL;
598 * Some errors are fatal and this list might need adjusting. For other
599 * errors we try again until the max number of retries is reached.
601 if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
602 result != -ENETDOWN && result != EINVAL
603 && result != -EPROTONOSUPPORT) {
604 lowcomms_connect_sock(con);
605 result = 0;
607 goto out;
610 static struct socket *create_listen_sock(struct connection *con, struct sockaddr_storage *saddr)
612 struct socket *sock = NULL;
613 mm_segment_t fs;
614 int result = 0;
615 int one = 1;
616 int addr_len;
618 if (dlm_local_addr.ss_family == AF_INET)
619 addr_len = sizeof(struct sockaddr_in);
620 else
621 addr_len = sizeof(struct sockaddr_in6);
623 /* Create a socket to communicate with */
624 result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
625 if (result < 0) {
626 printk("dlm: Can't create listening comms socket\n");
627 goto create_out;
630 fs = get_fs();
631 set_fs(get_ds());
632 result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one));
633 set_fs(fs);
634 if (result < 0) {
635 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",result);
637 sock->sk->sk_user_data = con;
638 con->rx_action = accept_from_sock;
639 con->sock = sock;
641 /* Bind to our port */
642 make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
643 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
644 if (result < 0) {
645 printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
646 sock_release(sock);
647 sock = NULL;
648 con->sock = NULL;
649 goto create_out;
652 fs = get_fs();
653 set_fs(get_ds());
655 result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&one, sizeof(one));
656 set_fs(fs);
657 if (result < 0) {
658 printk("dlm: Set keepalive failed: %d\n", result);
661 result = sock->ops->listen(sock, 5);
662 if (result < 0) {
663 printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
664 sock_release(sock);
665 sock = NULL;
666 goto create_out;
669 create_out:
670 return sock;
674 /* Listen on all interfaces */
675 static int listen_for_all(void)
677 struct socket *sock = NULL;
678 struct connection *con = nodeid2con(0, GFP_KERNEL);
679 int result = -EINVAL;
681 /* We don't support multi-homed hosts */
682 memset(con, 0, sizeof(*con));
683 init_rwsem(&con->sock_sem);
684 spin_lock_init(&con->writequeue_lock);
685 INIT_LIST_HEAD(&con->writequeue);
686 set_bit(CF_IS_OTHERCON, &con->flags);
688 sock = create_listen_sock(con, &dlm_local_addr);
689 if (sock) {
690 add_sock(sock, con);
691 result = 0;
693 else {
694 result = -EADDRINUSE;
697 return result;
702 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
703 gfp_t allocation)
705 struct writequeue_entry *entry;
707 entry = kmalloc(sizeof(struct writequeue_entry), allocation);
708 if (!entry)
709 return NULL;
711 entry->page = alloc_page(allocation);
712 if (!entry->page) {
713 kfree(entry);
714 return NULL;
717 entry->offset = 0;
718 entry->len = 0;
719 entry->end = 0;
720 entry->users = 0;
721 entry->con = con;
723 return entry;
726 void *dlm_lowcomms_get_buffer(int nodeid, int len,
727 gfp_t allocation, char **ppc)
729 struct connection *con;
730 struct writequeue_entry *e;
731 int offset = 0;
732 int users = 0;
734 if (!atomic_read(&accepting))
735 return NULL;
737 con = nodeid2con(nodeid, allocation);
738 if (!con)
739 return NULL;
741 spin_lock(&con->writequeue_lock);
742 e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
743 if (((struct list_head *) e == &con->writequeue) ||
744 (PAGE_CACHE_SIZE - e->end < len)) {
745 e = NULL;
746 } else {
747 offset = e->end;
748 e->end += len;
749 users = e->users++;
751 spin_unlock(&con->writequeue_lock);
753 if (e) {
754 got_one:
755 if (users == 0)
756 kmap(e->page);
757 *ppc = page_address(e->page) + offset;
758 return e;
761 e = new_writequeue_entry(con, allocation);
762 if (e) {
763 spin_lock(&con->writequeue_lock);
764 offset = e->end;
765 e->end += len;
766 users = e->users++;
767 list_add_tail(&e->list, &con->writequeue);
768 spin_unlock(&con->writequeue_lock);
769 goto got_one;
771 return NULL;
774 void dlm_lowcomms_commit_buffer(void *mh)
776 struct writequeue_entry *e = (struct writequeue_entry *)mh;
777 struct connection *con = e->con;
778 int users;
780 if (!atomic_read(&accepting))
781 return;
783 spin_lock(&con->writequeue_lock);
784 users = --e->users;
785 if (users)
786 goto out;
787 e->len = e->end - e->offset;
788 kunmap(e->page);
789 spin_unlock(&con->writequeue_lock);
791 if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
792 spin_lock_bh(&write_sockets_lock);
793 list_add_tail(&con->write_list, &write_sockets);
794 spin_unlock_bh(&write_sockets_lock);
796 wake_up_interruptible(&lowcomms_send_waitq);
798 return;
800 out:
801 spin_unlock(&con->writequeue_lock);
802 return;
805 static void free_entry(struct writequeue_entry *e)
807 __free_page(e->page);
808 kfree(e);
811 /* Send a message */
812 static int send_to_sock(struct connection *con)
814 int ret = 0;
815 ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
816 const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
817 struct writequeue_entry *e;
818 int len, offset;
820 down_read(&con->sock_sem);
821 if (con->sock == NULL)
822 goto out_connect;
824 sendpage = con->sock->ops->sendpage;
826 spin_lock(&con->writequeue_lock);
827 for (;;) {
828 e = list_entry(con->writequeue.next, struct writequeue_entry,
829 list);
830 if ((struct list_head *) e == &con->writequeue)
831 break;
833 len = e->len;
834 offset = e->offset;
835 BUG_ON(len == 0 && e->users == 0);
836 spin_unlock(&con->writequeue_lock);
838 ret = 0;
839 if (len) {
840 ret = sendpage(con->sock, e->page, offset, len,
841 msg_flags);
842 if (ret == -EAGAIN || ret == 0)
843 goto out;
844 if (ret <= 0)
845 goto send_error;
847 else {
848 /* Don't starve people filling buffers */
849 schedule();
852 spin_lock(&con->writequeue_lock);
853 e->offset += ret;
854 e->len -= ret;
856 if (e->len == 0 && e->users == 0) {
857 list_del(&e->list);
858 free_entry(e);
859 continue;
862 spin_unlock(&con->writequeue_lock);
863 out:
864 up_read(&con->sock_sem);
865 return ret;
867 send_error:
868 up_read(&con->sock_sem);
869 close_connection(con, FALSE);
870 lowcomms_connect_sock(con);
871 return ret;
873 out_connect:
874 up_read(&con->sock_sem);
875 lowcomms_connect_sock(con);
876 return 0;
879 static void clean_one_writequeue(struct connection *con)
881 struct list_head *list;
882 struct list_head *temp;
884 spin_lock(&con->writequeue_lock);
885 list_for_each_safe(list, temp, &con->writequeue) {
886 struct writequeue_entry *e =
887 list_entry(list, struct writequeue_entry, list);
888 list_del(&e->list);
889 free_entry(e);
891 spin_unlock(&con->writequeue_lock);
894 /* Called from recovery when it knows that a node has
895 left the cluster */
896 int dlm_lowcomms_close(int nodeid)
898 struct connection *con;
900 if (!connections)
901 goto out;
903 log_print("closing connection to node %d", nodeid);
904 con = nodeid2con(nodeid, 0);
905 if (con) {
906 clean_one_writequeue(con);
907 close_connection(con, TRUE);
908 atomic_set(&con->waiting_requests, 0);
910 return 0;
912 out:
913 return -1;
916 /* API send message call, may queue the request */
917 /* N.B. This is the old interface - use the new one for new calls */
918 int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
920 struct writequeue_entry *e;
921 char *b;
923 e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
924 if (e) {
925 memcpy(b, buf, len);
926 dlm_lowcomms_commit_buffer(e);
927 return 0;
929 return -ENOBUFS;
932 /* Look for activity on active sockets */
933 static void process_sockets(void)
935 struct list_head *list;
936 struct list_head *temp;
937 int count = 0;
939 spin_lock_bh(&read_sockets_lock);
940 list_for_each_safe(list, temp, &read_sockets) {
942 struct connection *con =
943 list_entry(list, struct connection, read_list);
944 list_del(&con->read_list);
945 clear_bit(CF_READ_PENDING, &con->flags);
947 spin_unlock_bh(&read_sockets_lock);
949 /* This can reach zero if we are processing requests
950 * as they come in.
952 if (atomic_read(&con->waiting_requests) == 0) {
953 spin_lock_bh(&read_sockets_lock);
954 continue;
957 do {
958 con->rx_action(con);
960 /* Don't starve out everyone else */
961 if (++count >= MAX_RX_MSG_COUNT) {
962 schedule();
963 count = 0;
966 } while (!atomic_dec_and_test(&con->waiting_requests) &&
967 !kthread_should_stop());
969 spin_lock_bh(&read_sockets_lock);
971 spin_unlock_bh(&read_sockets_lock);
974 /* Try to send any messages that are pending
976 static void process_output_queue(void)
978 struct list_head *list;
979 struct list_head *temp;
980 int ret;
982 spin_lock_bh(&write_sockets_lock);
983 list_for_each_safe(list, temp, &write_sockets) {
984 struct connection *con =
985 list_entry(list, struct connection, write_list);
986 clear_bit(CF_WRITE_PENDING, &con->flags);
987 list_del(&con->write_list);
989 spin_unlock_bh(&write_sockets_lock);
991 ret = send_to_sock(con);
992 if (ret < 0) {
994 spin_lock_bh(&write_sockets_lock);
996 spin_unlock_bh(&write_sockets_lock);
999 static void process_state_queue(void)
1001 struct list_head *list;
1002 struct list_head *temp;
1003 int ret;
1005 spin_lock_bh(&state_sockets_lock);
1006 list_for_each_safe(list, temp, &state_sockets) {
1007 struct connection *con =
1008 list_entry(list, struct connection, state_list);
1009 list_del(&con->state_list);
1010 clear_bit(CF_CONNECT_PENDING, &con->flags);
1011 spin_unlock_bh(&state_sockets_lock);
1013 ret = connect_to_sock(con);
1014 if (ret < 0) {
1016 spin_lock_bh(&state_sockets_lock);
1018 spin_unlock_bh(&state_sockets_lock);
1022 /* Discard all entries on the write queues */
1023 static void clean_writequeues(void)
1025 int nodeid;
1027 for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
1028 struct connection *con = nodeid2con(nodeid, 0);
1030 if (con)
1031 clean_one_writequeue(con);
1035 static int read_list_empty(void)
1037 int status;
1039 spin_lock_bh(&read_sockets_lock);
1040 status = list_empty(&read_sockets);
1041 spin_unlock_bh(&read_sockets_lock);
1043 return status;
1046 /* DLM Transport comms receive daemon */
1047 static int dlm_recvd(void *data)
1049 init_waitqueue_head(&lowcomms_recv_waitq);
1050 init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1051 add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1053 while (!kthread_should_stop()) {
1054 set_current_state(TASK_INTERRUPTIBLE);
1055 if (read_list_empty())
1056 schedule();
1057 set_current_state(TASK_RUNNING);
1059 process_sockets();
1062 return 0;
1065 static int write_and_state_lists_empty(void)
1067 int status;
1069 spin_lock_bh(&write_sockets_lock);
1070 status = list_empty(&write_sockets);
1071 spin_unlock_bh(&write_sockets_lock);
1073 spin_lock_bh(&state_sockets_lock);
1074 if (list_empty(&state_sockets) == 0)
1075 status = 0;
1076 spin_unlock_bh(&state_sockets_lock);
1078 return status;
1081 /* DLM Transport send daemon */
1082 static int dlm_sendd(void *data)
1084 init_waitqueue_head(&lowcomms_send_waitq);
1085 init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1086 add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1088 while (!kthread_should_stop()) {
1089 set_current_state(TASK_INTERRUPTIBLE);
1090 if (write_and_state_lists_empty())
1091 schedule();
1092 set_current_state(TASK_RUNNING);
1094 process_state_queue();
1095 process_output_queue();
1098 return 0;
1101 static void daemons_stop(void)
1103 kthread_stop(recv_task);
1104 kthread_stop(send_task);
1107 static int daemons_start(void)
1109 struct task_struct *p;
1110 int error;
1112 p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1113 error = IS_ERR(p);
1114 if (error) {
1115 log_print("can't start dlm_recvd %d", error);
1116 return error;
1118 recv_task = p;
1120 p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1121 error = IS_ERR(p);
1122 if (error) {
1123 log_print("can't start dlm_sendd %d", error);
1124 kthread_stop(recv_task);
1125 return error;
1127 send_task = p;
1129 return 0;
1133 * Return the largest buffer size we can cope with.
1135 int lowcomms_max_buffer_size(void)
1137 return PAGE_CACHE_SIZE;
1140 void dlm_lowcomms_stop(void)
1142 int i;
1144 atomic_set(&accepting, 0);
1146 /* Set all the activity flags to prevent any
1147 socket activity.
1149 for (i = 0; i < conn_array_size; i++) {
1150 if (connections[i])
1151 connections[i]->flags |= 0x7;
1153 daemons_stop();
1154 clean_writequeues();
1156 for (i = 0; i < conn_array_size; i++) {
1157 if (connections[i]) {
1158 close_connection(connections[i], TRUE);
1159 if (connections[i]->othercon)
1160 kmem_cache_free(con_cache, connections[i]->othercon);
1161 kmem_cache_free(con_cache, connections[i]);
1165 kfree(connections);
1166 connections = NULL;
1168 kmem_cache_destroy(con_cache);
1171 /* This is quite likely to sleep... */
1172 int dlm_lowcomms_start(void)
1174 int error = 0;
1176 error = -ENOTCONN;
1179 * Temporarily initialise the waitq head so that lowcomms_send_message
1180 * doesn't crash if it gets called before the thread is fully
1181 * initialised
1183 init_waitqueue_head(&lowcomms_send_waitq);
1185 error = -ENOMEM;
1186 connections = kmalloc(sizeof(struct connection *) *
1187 NODE_INCREMENT, GFP_KERNEL);
1188 if (!connections)
1189 goto out;
1191 memset(connections, 0,
1192 sizeof(struct connection *) * NODE_INCREMENT);
1194 conn_array_size = NODE_INCREMENT;
1196 if (dlm_our_addr(&dlm_local_addr, 0)) {
1197 log_print("no local IP address has been set");
1198 goto fail_free_conn;
1200 if (!dlm_our_addr(&dlm_local_addr, 1)) {
1201 log_print("This dlm comms module does not support multi-homed clustering");
1202 goto fail_free_conn;
1205 con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1206 __alignof__(struct connection), 0, NULL, NULL);
1207 if (!con_cache)
1208 goto fail_free_conn;
1211 /* Start listening */
1212 error = listen_for_all();
1213 if (error)
1214 goto fail_unlisten;
1216 error = daemons_start();
1217 if (error)
1218 goto fail_unlisten;
1220 atomic_set(&accepting, 1);
1222 return 0;
1224 fail_unlisten:
1225 close_connection(connections[0], 0);
1226 kmem_cache_free(con_cache, connections[0]);
1227 kmem_cache_destroy(con_cache);
1229 fail_free_conn:
1230 kfree(connections);
1232 out:
1233 return error;
1236 int dlm_lowcomms_init(void)
1238 INIT_LIST_HEAD(&read_sockets);
1239 INIT_LIST_HEAD(&write_sockets);
1240 INIT_LIST_HEAD(&state_sockets);
1242 spin_lock_init(&read_sockets_lock);
1243 spin_lock_init(&write_sockets_lock);
1244 spin_lock_init(&state_sockets_lock);
1245 init_MUTEX(&connections_lock);
1247 return 0;
1250 void dlm_lowcomms_exit(void)
1255 * Overrides for Emacs so that we follow Linus's tabbing style.
1256 * Emacs will notice this stuff at the end of the file and automatically
1257 * adjust the settings for this buffer only. This must remain at the end
1258 * of the file.
1259 * ---------------------------------------------------------------------------
1260 * Local variables:
1261 * c-file-style: "linux"
1262 * End: