2 * 2007+ Copyright (c) Evgeniy Polyakov <zbr@ioremap.net>
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include <linux/buffer_head.h>
17 #include <linux/blkdev.h>
18 #include <linux/bio.h>
19 #include <linux/connector.h>
20 #include <linux/dst.h>
21 #include <linux/device.h>
23 #include <linux/in6.h>
24 #include <linux/socket.h>
25 #include <linux/slab.h>
33 struct dst_poll_helper
{
38 static int dst_queue_wake(wait_queue_t
*wait
, unsigned mode
,
41 struct dst_state
*st
= container_of(wait
, struct dst_state
, wait
);
43 wake_up(&st
->thread_wait
);
47 static void dst_queue_func(struct file
*file
, wait_queue_head_t
*whead
,
50 struct dst_state
*st
= container_of(pt
, struct dst_poll_helper
, pt
)->st
;
53 init_waitqueue_func_entry(&st
->wait
, dst_queue_wake
);
54 add_wait_queue(whead
, &st
->wait
);
57 void dst_poll_exit(struct dst_state
*st
)
60 remove_wait_queue(st
->whead
, &st
->wait
);
65 int dst_poll_init(struct dst_state
*st
)
67 struct dst_poll_helper ph
;
70 init_poll_funcptr(&ph
.pt
, &dst_queue_func
);
72 st
->socket
->ops
->poll(NULL
, st
->socket
, &ph
.pt
);
77 * Header receiving function - may block.
79 static int dst_data_recv_header(struct socket
*sock
,
80 void *data
, unsigned int size
, int block
)
89 msg
.msg_iov
= (struct iovec
*)&iov
;
93 msg
.msg_control
= NULL
;
94 msg
.msg_controllen
= 0;
95 msg
.msg_flags
= (block
) ? MSG_WAITALL
: MSG_DONTWAIT
;
97 err
= kernel_recvmsg(sock
, &msg
, &iov
, 1, iov
.iov_len
,
106 * Header sending function - may block.
108 int dst_data_send_header(struct socket
*sock
,
109 void *data
, unsigned int size
, int more
)
118 msg
.msg_iov
= (struct iovec
*)&iov
;
122 msg
.msg_control
= NULL
;
123 msg
.msg_controllen
= 0;
124 msg
.msg_flags
= MSG_WAITALL
| (more
? MSG_MORE
: 0);
126 err
= kernel_sendmsg(sock
, &msg
, &iov
, 1, iov
.iov_len
);
128 dprintk("%s: size: %u, more: %d, err: %d.\n",
129 __func__
, size
, more
, err
);
137 * Block autoconfiguration: request size of the storage and permissions.
139 static int dst_request_remote_config(struct dst_state
*st
)
141 struct dst_node
*n
= st
->node
;
143 struct dst_cmd
*cmd
= st
->data
;
145 memset(cmd
, 0, sizeof(struct dst_cmd
));
148 dst_convert_cmd(cmd
);
150 err
= dst_data_send_header(st
->socket
, cmd
, sizeof(struct dst_cmd
), 0);
154 err
= dst_data_recv_header(st
->socket
, cmd
, sizeof(struct dst_cmd
), 1);
158 dst_convert_cmd(cmd
);
160 if (cmd
->cmd
!= DST_CFG
) {
162 dprintk("%s: checking result: cmd: %d, size reported: %llu.\n",
163 __func__
, cmd
->cmd
, cmd
->sector
);
168 n
->size
= min_t(loff_t
, n
->size
, cmd
->sector
);
170 n
->size
= cmd
->sector
;
172 n
->info
->size
= n
->size
;
173 st
->permissions
= cmd
->rw
;
176 dprintk("%s: n: %p, err: %d, size: %llu, permission: %x.\n",
177 __func__
, n
, err
, n
->size
, st
->permissions
);
185 #define DST_DEFAULT_TIMEO 20000
187 int dst_state_socket_create(struct dst_state
*st
)
191 struct dst_network_ctl
*ctl
= &st
->ctl
;
193 err
= sock_create(ctl
->addr
.sa_family
, ctl
->type
, ctl
->proto
, &sock
);
197 sock
->sk
->sk_sndtimeo
= sock
->sk
->sk_rcvtimeo
=
198 msecs_to_jiffies(DST_DEFAULT_TIMEO
);
199 sock
->sk
->sk_allocation
= GFP_NOIO
;
201 st
->socket
= st
->read_socket
= sock
;
205 void dst_state_socket_release(struct dst_state
*st
)
207 dprintk("%s: st: %p, socket: %p, n: %p.\n",
208 __func__
, st
, st
->socket
, st
->node
);
210 sock_release(st
->socket
);
212 st
->read_socket
= NULL
;
216 void dst_dump_addr(struct socket
*sk
, struct sockaddr
*sa
, char *str
)
218 if (sk
->ops
->family
== AF_INET
) {
219 struct sockaddr_in
*sin
= (struct sockaddr_in
*)sa
;
220 printk(KERN_INFO
"%s %u.%u.%u.%u:%d.\n", str
,
221 NIPQUAD(sin
->sin_addr
.s_addr
), ntohs(sin
->sin_port
));
222 } else if (sk
->ops
->family
== AF_INET6
) {
223 struct sockaddr_in6
*sin
= (struct sockaddr_in6
*)sa
;
224 printk(KERN_INFO
"%s %pi6:%d",
225 str
, &sin
->sin6_addr
, ntohs(sin
->sin6_port
));
229 void dst_state_exit_connected(struct dst_state
*st
)
233 st
->socket
->ops
->shutdown(st
->socket
, 2);
235 dst_dump_addr(st
->socket
, (struct sockaddr
*)&st
->ctl
.addr
,
236 "Disconnected peer");
237 dst_state_socket_release(st
);
241 static int dst_state_init_connected(struct dst_state
*st
)
244 struct dst_network_ctl
*ctl
= &st
->ctl
;
246 err
= dst_state_socket_create(st
);
250 err
= kernel_connect(st
->socket
, (struct sockaddr
*)&st
->ctl
.addr
,
251 st
->ctl
.addr
.sa_data_len
, 0);
253 goto err_out_release
;
255 err
= dst_poll_init(st
);
257 goto err_out_release
;
259 dst_dump_addr(st
->socket
, (struct sockaddr
*)&ctl
->addr
,
260 "Connected to peer");
265 dst_state_socket_release(st
);
271 * State reset is used to reconnect to the remote peer.
272 * May fail, but who cares, we will try again later.
274 static inline void dst_state_reset_nolock(struct dst_state
*st
)
276 dst_state_exit_connected(st
);
277 dst_state_init_connected(st
);
280 static inline void dst_state_reset(struct dst_state
*st
)
283 dst_state_reset_nolock(st
);
284 dst_state_unlock(st
);
288 * Basic network sending/receiving functions.
289 * Blocked mode is used.
291 static int dst_data_recv_raw(struct dst_state
*st
, void *buf
, u64 size
)
302 msg
.msg_iov
= (struct iovec
*)&iov
;
306 msg
.msg_control
= NULL
;
307 msg
.msg_controllen
= 0;
308 msg
.msg_flags
= MSG_DONTWAIT
;
310 err
= kernel_recvmsg(st
->socket
, &msg
, &iov
, 1, iov
.iov_len
,
313 dprintk("%s: failed to recv data: size: %llu, err: %d.\n",
314 __func__
, size
, err
);
318 dst_state_exit_connected(st
);
325 * Ping command to early detect failed nodes.
327 static int dst_send_ping(struct dst_state
*st
)
329 struct dst_cmd
*cmd
= st
->data
;
330 int err
= -ECONNRESET
;
334 memset(cmd
, 0, sizeof(struct dst_cmd
));
336 cmd
->cmd
= __cpu_to_be32(DST_PING
);
338 err
= dst_data_send_header(st
->socket
, cmd
,
339 sizeof(struct dst_cmd
), 0);
341 dprintk("%s: st: %p, socket: %p, err: %d.\n", __func__
,
342 st
, st
->socket
, err
);
343 dst_state_unlock(st
);
349 * Receiving function, which should either return error or read
350 * whole block request. If there was no traffic for a one second,
351 * send a ping, since remote node may die.
353 int dst_data_recv(struct dst_state
*st
, void *data
, unsigned int size
)
355 unsigned int revents
= 0;
356 unsigned int err_mask
= POLLERR
| POLLHUP
| POLLRDHUP
;
357 unsigned int mask
= err_mask
| POLLIN
;
358 struct dst_node
*n
= st
->node
;
361 while (size
&& !err
) {
362 revents
= dst_state_poll(st
);
364 if (!(revents
& mask
)) {
368 prepare_to_wait(&st
->thread_wait
, &wait
,
370 if (!n
->trans_scan_timeout
|| st
->need_exit
)
373 revents
= dst_state_poll(st
);
378 if (signal_pending(current
))
381 if (!schedule_timeout(HZ
)) {
382 err
= dst_send_ping(st
);
389 finish_wait(&st
->thread_wait
, &wait
);
395 if (st
->socket
&& (st
->read_socket
== st
->socket
) &&
396 (revents
& POLLIN
)) {
397 err
= dst_data_recv_raw(st
, data
, size
);
405 if (revents
& err_mask
|| !st
->socket
) {
406 dprintk("%s: revents: %x, socket: %p, size: %u, "
407 "err: %d.\n", __func__
, revents
,
408 st
->socket
, size
, err
);
412 dst_state_unlock(st
);
414 if (!n
->trans_scan_timeout
)
422 * Send block autoconf reply.
424 static int dst_process_cfg(struct dst_state
*st
)
426 struct dst_node
*n
= st
->node
;
427 struct dst_cmd
*cmd
= st
->data
;
430 cmd
->sector
= n
->size
;
431 cmd
->rw
= st
->permissions
;
433 dst_convert_cmd(cmd
);
436 err
= dst_data_send_header(st
->socket
, cmd
, sizeof(struct dst_cmd
), 0);
437 dst_state_unlock(st
);
443 * Receive block IO from the network.
445 static int dst_recv_bio(struct dst_state
*st
, struct bio
*bio
,
446 unsigned int total_size
)
453 bio_for_each_segment(bv
, bio
, i
) {
454 sz
= min(total_size
, bv
->bv_len
);
456 dprintk("%s: bio: %llu/%u, total: %u, len: %u, sz: %u, "
457 "off: %u.\n", __func__
, (u64
)bio
->bi_sector
,
458 bio
->bi_size
, total_size
, bv
->bv_len
, sz
,
461 data
= kmap(bv
->bv_page
) + bv
->bv_offset
;
462 err
= dst_data_recv(st
, data
, sz
);
479 * Our block IO has just completed and arrived: get it.
481 static int dst_process_io_response(struct dst_state
*st
)
483 struct dst_node
*n
= st
->node
;
484 struct dst_cmd
*cmd
= st
->data
;
489 mutex_lock(&n
->trans_lock
);
490 t
= dst_trans_search(n
, cmd
->id
);
491 mutex_unlock(&n
->trans_lock
);
498 dprintk("%s: bio: %llu/%u, cmd_size: %u, csize: %u, dir: %lu.\n",
499 __func__
, (u64
)bio
->bi_sector
, bio
->bi_size
, cmd
->size
,
500 cmd
->csize
, bio_data_dir(bio
));
502 if (bio_data_dir(bio
) == READ
) {
503 if (bio
->bi_size
!= cmd
->size
- cmd
->csize
)
506 if (dst_need_crypto(n
)) {
507 err
= dst_recv_cdata(st
, t
->cmd
.hash
);
512 err
= dst_recv_bio(st
, t
->bio
, bio
->bi_size
);
516 if (dst_need_crypto(n
))
517 return dst_trans_crypto(t
);
520 if (cmd
->size
|| cmd
->csize
)
534 * Receive crypto data.
536 int dst_recv_cdata(struct dst_state
*st
, void *cdata
)
538 struct dst_cmd
*cmd
= st
->data
;
539 struct dst_node
*n
= st
->node
;
540 struct dst_crypto_ctl
*c
= &n
->crypto
;
543 if (cmd
->csize
!= c
->crypto_attached_size
) {
544 dprintk("%s: cmd: cmd: %u, sector: %llu, size: %u, "
545 "csize: %u != digest size %u.\n",
546 __func__
, cmd
->cmd
, cmd
->sector
, cmd
->size
,
547 cmd
->csize
, c
->crypto_attached_size
);
552 err
= dst_data_recv(st
, cdata
, cmd
->csize
);
556 cmd
->size
-= cmd
->csize
;
564 * Receive the command and start its processing.
566 static int dst_recv_processing(struct dst_state
*st
)
569 struct dst_cmd
*cmd
= st
->data
;
572 * If socket will be reset after this statement, then
573 * dst_data_recv() will just fail and loop will
574 * start again, so it can be done without any locks.
576 * st->read_socket is needed to prevents state machine
577 * breaking between this data reading and subsequent one
578 * in protocol specific functions during connection reset.
579 * In case of reset we have to read next command and do
580 * not expect data for old command to magically appear in
583 st
->read_socket
= st
->socket
;
584 err
= dst_data_recv(st
, cmd
, sizeof(struct dst_cmd
));
588 dst_convert_cmd(cmd
);
590 dprintk("%s: cmd: %u, size: %u, csize: %u, id: %llu, "
591 "sector: %llu, flags: %llx, rw: %llx.\n",
592 __func__
, cmd
->cmd
, cmd
->size
,
593 cmd
->csize
, cmd
->id
, cmd
->sector
,
594 cmd
->flags
, cmd
->rw
);
597 * This should catch protocol breakage and random garbage
598 * instead of commands.
600 if (unlikely(cmd
->csize
> st
->size
- sizeof(struct dst_cmd
))) {
607 case DST_IO_RESPONSE
:
608 err
= dst_process_io_response(st
);
611 err
= dst_process_io(st
);
614 err
= dst_process_cfg(st
);
628 * Receiving thread. For the client node we should try to reconnect,
629 * for accepted client we just drop the state and expect it to reconnect.
631 static int dst_recv(void *init_data
, void *schedule_data
)
633 struct dst_state
*st
= schedule_data
;
634 struct dst_node
*n
= init_data
;
637 dprintk("%s: start st: %p, n: %p, scan: %lu, need_exit: %d.\n",
638 __func__
, st
, n
, n
->trans_scan_timeout
, st
->need_exit
);
640 while (n
->trans_scan_timeout
&& !st
->need_exit
) {
641 err
= dst_recv_processing(st
);
646 if (!n
->trans_scan_timeout
|| st
->need_exit
)
655 wake_up(&st
->thread_wait
);
657 dprintk("%s: freeing receiving socket st: %p.\n", __func__
, st
);
659 dst_state_exit_connected(st
);
660 dst_state_unlock(st
);
663 dprintk("%s: freed receiving socket st: %p.\n", __func__
, st
);
669 * Network state dies here and borns couple of lines below.
670 * This object is the main network state processing engine:
671 * sending, receiving, reconnections, all network related
672 * tasks are handled on behalf of the state.
674 static void dst_state_free(struct dst_state
*st
)
676 dprintk("%s: st: %p.\n", __func__
, st
);
683 struct dst_state
*dst_state_alloc(struct dst_node
*n
)
685 struct dst_state
*st
;
688 st
= kzalloc(sizeof(struct dst_state
), GFP_KERNEL
);
695 st
->size
= PAGE_SIZE
;
696 st
->data
= kmalloc(st
->size
, GFP_KERNEL
);
700 spin_lock_init(&st
->request_lock
);
701 INIT_LIST_HEAD(&st
->request_list
);
703 mutex_init(&st
->state_lock
);
704 init_waitqueue_head(&st
->thread_wait
);
707 * One for processing thread, another one for node itself.
709 atomic_set(&st
->refcnt
, 2);
711 dprintk("%s: st: %p, n: %p.\n", __func__
, st
, st
->node
);
721 int dst_state_schedule_receiver(struct dst_state
*st
)
723 return thread_pool_schedule_private(st
->node
->pool
, dst_thread_setup
,
724 dst_recv
, st
, MAX_SCHEDULE_TIMEOUT
, st
->node
);
728 * Initialize client's connection to the remote peer: allocate state,
729 * connect and perform block IO autoconfiguration.
731 int dst_node_init_connected(struct dst_node
*n
, struct dst_network_ctl
*r
)
733 struct dst_state
*st
;
736 st
= dst_state_alloc(n
);
741 memcpy(&st
->ctl
, r
, sizeof(struct dst_network_ctl
));
743 err
= dst_state_init_connected(st
);
745 goto err_out_free_data
;
747 err
= dst_request_remote_config(st
);
749 goto err_out_exit_connected
;
752 err
= dst_state_schedule_receiver(st
);
754 goto err_out_exit_connected
;
758 err_out_exit_connected
:
759 dst_state_exit_connected(st
);
767 void dst_state_put(struct dst_state
*st
)
769 dprintk("%s: st: %p, refcnt: %d.\n",
770 __func__
, st
, atomic_read(&st
->refcnt
));
771 if (atomic_dec_and_test(&st
->refcnt
))
776 * Send block IO to the network one by one using zero-copy ->sendpage().
778 int dst_send_bio(struct dst_state
*st
, struct dst_cmd
*cmd
, struct bio
*bio
)
781 struct dst_crypto_ctl
*c
= &st
->node
->crypto
;
783 int flags
= MSG_WAITALL
;
785 err
= dst_data_send_header(st
->socket
, cmd
,
786 sizeof(struct dst_cmd
) + c
->crypto_attached_size
, bio
->bi_vcnt
);
790 bio_for_each_segment(bv
, bio
, i
) {
791 if (i
< bio
->bi_vcnt
- 1)
794 err
= kernel_sendpage(st
->socket
, bv
->bv_page
, bv
->bv_offset
,
803 dprintk("%s: %d/%d, flags: %x, err: %d.\n",
804 __func__
, i
, bio
->bi_vcnt
, flags
, err
);
809 * Send transaction to the remote peer.
811 int dst_trans_send(struct dst_trans
*t
)
814 struct dst_state
*st
= t
->n
->state
;
815 struct bio
*bio
= t
->bio
;
817 dst_convert_cmd(&t
->cmd
);
821 err
= dst_state_init_connected(st
);
826 if (bio_data_dir(bio
) == WRITE
) {
827 err
= dst_send_bio(st
, &t
->cmd
, t
->bio
);
829 err
= dst_data_send_header(st
->socket
, &t
->cmd
,
830 sizeof(struct dst_cmd
), 0);
835 dst_state_unlock(st
);
839 dst_state_reset_nolock(st
);
841 dst_state_unlock(st
);