2 * Copyright (C) 2011+ Evgeniy Polyakov <zbr@ioremap.net>
14 void *pohmelfs_scratch_buf
;
15 int pohmelfs_scratch_buf_size
= 4096;
17 void pohmelfs_print_addr(struct sockaddr_storage
*addr
, const char *fmt
, ...)
19 struct sockaddr
*sa
= (struct sockaddr
*)addr
;
24 ptr
= kvasprintf(GFP_NOIO
, fmt
, args
);
28 if (sa
->sa_family
== AF_INET
) {
29 struct sockaddr_in
*sin
= (struct sockaddr_in
*)addr
;
30 pr_info("pohmelfs: %pI4:%d: %s", &sin
->sin_addr
.s_addr
, ntohs(sin
->sin_port
), ptr
);
31 } else if (sa
->sa_family
== AF_INET6
) {
32 struct sockaddr_in6
*sin
= (struct sockaddr_in6
*)addr
;
33 pr_info("pohmelfs: %pI6:%d: %s", &sin
->sin6_addr
, ntohs(sin
->sin6_port
), ptr
);
42 * Basic network sending/receiving functions.
43 * Blocked mode is used.
45 int pohmelfs_data_recv(struct pohmelfs_state
*st
, void *buf
, u64 size
, unsigned int flags
)
56 msg
.msg_iov
= (struct iovec
*)&iov
;
60 msg
.msg_control
= NULL
;
61 msg
.msg_controllen
= 0;
62 msg
.msg_flags
= flags
;
64 err
= kernel_recvmsg(st
->sock
, &msg
, &iov
, 1, iov
.iov_len
, msg
.msg_flags
);
72 int pohmelfs_recv(struct pohmelfs_trans
*t
, struct pohmelfs_state
*recv
, void *data
, int size
)
76 err
= pohmelfs_data_recv(recv
, data
, size
, MSG_DONTWAIT
);
84 static int pohmelfs_data_send(struct pohmelfs_trans
*t
)
92 msg
.msg_control
= NULL
;
93 msg
.msg_controllen
= 0;
94 msg
.msg_flags
= MSG_DONTWAIT
;
100 if (t
->io_offset
< t
->header_size
) {
101 io
.iov_base
= (void *)(&t
->cmd
) + t
->io_offset
;
102 io
.iov_len
= t
->header_size
- t
->io_offset
;
104 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, io
.iov_len
);
114 if ((t
->io_offset
>= t
->header_size
) && t
->data
) {
115 size_t sent_size
= t
->io_offset
- t
->header_size
;
116 io
.iov_base
= t
->data
+ sent_size
;
117 io
.iov_len
= t
->data_size
- sent_size
;
119 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, io
.iov_len
);
136 static int pohmelfs_page_send(struct pohmelfs_trans
*t
)
138 struct pohmelfs_write_ctl
*ctl
= t
->wctl
;
144 if (t
->io_offset
< t
->header_size
) {
145 io
.iov_base
= (void *)(&t
->cmd
) + t
->io_offset
;
146 io
.iov_len
= t
->header_size
- t
->io_offset
;
150 msg
.msg_control
= NULL
;
151 msg
.msg_controllen
= 0;
152 msg
.msg_flags
= MSG_DONTWAIT
;
157 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, io
.iov_len
);
167 if (t
->io_offset
>= t
->header_size
) {
168 size_t skip_offset
= 0;
169 size_t size
= le64_to_cpu(t
->cmd
.cmd
.size
) + sizeof(struct dnet_cmd
) - t
->io_offset
;
170 size_t current_io_offset
= t
->io_offset
- t
->header_size
;
172 for (i
= 0; i
< pagevec_count(&ctl
->pvec
); ++i
) {
173 struct page
*page
= ctl
->pvec
.pages
[i
];
174 size_t sz
= PAGE_CACHE_SIZE
;
179 if (current_io_offset
> skip_offset
+ sz
) {
184 sz
-= current_io_offset
- skip_offset
;
186 err
= kernel_sendpage(t
->st
->sock
, page
, current_io_offset
- skip_offset
, sz
, MSG_DONTWAIT
);
188 pr_debug("pohmelfs: %s: %d/%d: total-size: %llu, io-offset: %llu, rest-size: %zd, current-io: %zd, "
189 "skip-offset: %zd, sz: %zu: %d\n",
190 pohmelfs_dump_id(pohmelfs_inode(t
->inode
)->id
.id
), i
, pagevec_count(&ctl
->pvec
),
191 (unsigned long long)le64_to_cpu(t
->cmd
.cmd
.size
) + sizeof(struct dnet_cmd
),
192 t
->io_offset
, size
, current_io_offset
, skip_offset
, sz
, err
);
200 current_io_offset
+= err
;
201 skip_offset
= current_io_offset
;
217 struct pohmelfs_poll_helper
{
219 struct pohmelfs_state
*st
;
222 static int pohmelfs_queue_wake(wait_queue_t
*wait
, unsigned mode
, int sync
, void *key
)
224 struct pohmelfs_state
*st
= container_of(wait
, struct pohmelfs_state
, wait
);
226 if (!st
->conn
->need_exit
)
227 queue_work(st
->conn
->wq
, &st
->io_work
);
231 static void pohmelfs_queue_func(struct file
*file
, wait_queue_head_t
*whead
, poll_table
*pt
)
233 struct pohmelfs_state
*st
= container_of(pt
, struct pohmelfs_poll_helper
, pt
)->st
;
237 init_waitqueue_func_entry(&st
->wait
, pohmelfs_queue_wake
);
238 add_wait_queue(whead
, &st
->wait
);
241 static void pohmelfs_poll_exit(struct pohmelfs_state
*st
)
244 remove_wait_queue(st
->whead
, &st
->wait
);
249 static int pohmelfs_poll_init(struct pohmelfs_state
*st
)
251 struct pohmelfs_poll_helper ph
;
254 init_poll_funcptr(&ph
.pt
, &pohmelfs_queue_func
);
256 st
->sock
->ops
->poll(NULL
, st
->sock
, &ph
.pt
);
260 static int pohmelfs_revents(struct pohmelfs_state
*st
, unsigned mask
)
264 revents
= st
->sock
->ops
->poll(NULL
, st
->sock
, NULL
);
268 if (revents
& (POLLERR
| POLLHUP
| POLLNVAL
| POLLRDHUP
| POLLREMOVE
)) {
269 pohmelfs_print_addr(&st
->sa
, "error revents: %x\n", revents
);
276 static int pohmelfs_state_send(struct pohmelfs_state
*st
)
278 struct pohmelfs_trans
*t
= NULL
;
283 mutex_lock(&st
->trans_lock
);
284 if (!list_empty(&st
->trans_list
))
285 t
= list_first_entry(&st
->trans_list
, struct pohmelfs_trans
, trans_entry
);
286 mutex_unlock(&st
->trans_lock
);
291 err
= pohmelfs_revents(st
, POLLOUT
);
295 size
= le64_to_cpu(t
->cmd
.cmd
.size
) + sizeof(struct dnet_cmd
);
296 pr_debug("pohmelfs: %s: starting sending: %llu/%zd\n", pohmelfs_dump_id(pohmelfs_inode(t
->inode
)->id
.id
), t
->io_offset
, size
);
299 err
= pohmelfs_page_send(t
);
301 err
= pohmelfs_data_send(t
);
303 pr_debug("pohmelfs: %s: sent: %llu/%zd: %d\n", pohmelfs_dump_id(pohmelfs_inode(t
->inode
)->id
.id
), t
->io_offset
, size
, err
);
304 if (!err
&& (t
->io_offset
== size
)) {
305 mutex_lock(&st
->trans_lock
);
306 list_del_init(&t
->trans_entry
);
307 err
= pohmelfs_trans_insert_tree(st
, t
);
311 mutex_unlock(&st
->trans_lock
);
314 BUG_ON(t
->io_offset
> size
);
317 pohmelfs_trans_put(t
);
319 if ((err
< 0) && (err
!= -EAGAIN
))
326 static void pohmelfs_suck_scratch(struct pohmelfs_state
*st
)
328 struct dnet_cmd
*cmd
= &st
->cmd
;
331 pr_debug("pohmelfs_suck_scratch: %llu\n", (unsigned long long)cmd
->size
);
334 int sz
= pohmelfs_scratch_buf_size
;
339 err
= pohmelfs_data_recv(st
, pohmelfs_scratch_buf
, sz
, MSG_WAITALL
);
341 pohmelfs_print_addr(&st
->sa
, "recv-scratch err: %d\n", err
);
352 static int pohmelfs_state_recv(struct pohmelfs_state
*st
)
354 struct dnet_cmd
*cmd
= &st
->cmd
;
355 struct pohmelfs_trans
*t
;
356 unsigned long long trans
;
359 err
= pohmelfs_revents(st
, POLLIN
);
364 err
= pohmelfs_data_recv(st
, cmd
, sizeof(struct dnet_cmd
), MSG_WAITALL
);
369 pohmelfs_print_addr(&st
->sa
, "recv error: %d\n", err
);
373 dnet_convert_cmd(cmd
);
375 trans
= cmd
->trans
& ~DNET_TRANS_REPLY
;
379 t
= pohmelfs_trans_lookup(st
, cmd
);
381 pohmelfs_suck_scratch(st
);
386 if (cmd
->size
&& (t
->io_offset
!= cmd
->size
)) {
387 err
= t
->cb
.recv_reply(t
, st
);
388 if (err
&& (err
!= -EAGAIN
)) {
389 pohmelfs_print_addr(&st
->sa
, "recv-reply error: %d\n", err
);
393 if (t
->io_offset
!= cmd
->size
)
397 err
= t
->cb
.complete(t
, st
);
399 pohmelfs_print_addr(&st
->sa
, "recv-complete err: %d\n", err
);
407 /* only remove and free transaction if there is error or there will be no more replies */
408 if (!(cmd
->flags
& DNET_FLAGS_MORE
) || err
) {
409 pohmelfs_trans_remove(t
);
412 * refcnt was grabbed twice:
413 * in pohmelfs_trans_lookup()
414 * and at transaction creation
416 pohmelfs_trans_put(t
);
420 cmd
->size
-= t
->io_offset
;
425 pohmelfs_trans_put(t
);
430 static void pohmelfs_state_io_work(struct work_struct
*work
)
432 struct pohmelfs_state
*st
= container_of(work
, struct pohmelfs_state
, io_work
);
433 int send_err
, recv_err
;
435 send_err
= recv_err
= -EAGAIN
;
436 while (!st
->conn
->psb
->need_exit
) {
437 send_err
= pohmelfs_state_send(st
);
438 if (send_err
&& (send_err
!= -EAGAIN
)) {
439 pohmelfs_print_addr(&st
->sa
, "state send error: %d\n", send_err
);
443 recv_err
= pohmelfs_state_recv(st
);
444 if (recv_err
&& (recv_err
!= -EAGAIN
)) {
445 pohmelfs_print_addr(&st
->sa
, "state recv error: %d\n", recv_err
);
449 if ((send_err
== -EAGAIN
) && (recv_err
== -EAGAIN
))
454 if ((send_err
&& (send_err
!= -EAGAIN
)) || (recv_err
&& (recv_err
!= -EAGAIN
))) {
455 pohmelfs_state_add_reconnect(st
);
460 struct pohmelfs_state
*pohmelfs_addr_exist(struct pohmelfs_connection
*conn
, struct sockaddr_storage
*sa
, int addrlen
)
462 struct pohmelfs_state
*st
;
464 list_for_each_entry(st
, &conn
->state_list
, state_entry
) {
465 if (st
->addrlen
!= addrlen
)
468 if (!memcmp(&st
->sa
, sa
, addrlen
)) {
476 struct pohmelfs_state
*pohmelfs_state_create(struct pohmelfs_connection
*conn
, struct sockaddr_storage
*sa
, int addrlen
,
477 int ask_route
, int group_id
)
480 struct pohmelfs_state
*st
;
481 struct sockaddr
*addr
= (struct sockaddr
*)sa
;
483 /* early check - this state can be inserted into route table, no need to create state and check again */
484 spin_lock(&conn
->state_lock
);
485 if (pohmelfs_addr_exist(conn
, sa
, addrlen
))
487 spin_unlock(&conn
->state_lock
);
492 st
= kzalloc(sizeof(struct pohmelfs_state
), GFP_KERNEL
);
499 mutex_init(&st
->trans_lock
);
500 INIT_LIST_HEAD(&st
->trans_list
);
501 st
->trans_root
= RB_ROOT
;
503 st
->group_id
= group_id
;
505 kref_init(&st
->refcnt
);
507 INIT_WORK(&st
->io_work
, pohmelfs_state_io_work
);
511 err
= sock_create_kern(addr
->sa_family
, SOCK_STREAM
, IPPROTO_TCP
, &st
->sock
);
513 pohmelfs_print_addr(sa
, "sock_create: failed family: %d, err: %d\n", addr
->sa_family
, err
);
517 st
->sock
->sk
->sk_allocation
= GFP_NOIO
;
518 st
->sock
->sk
->sk_sndtimeo
= st
->sock
->sk
->sk_rcvtimeo
= msecs_to_jiffies(60000);
521 sock_setsockopt(st
->sock
, SOL_SOCKET
, SO_KEEPALIVE
, (char *)&err
, 4);
523 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPIDLE
, (char *)&conn
->psb
->keepalive_idle
, 4);
524 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPINTVL
, (char *)&conn
->psb
->keepalive_interval
, 4);
525 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPCNT
, (char *)&conn
->psb
->keepalive_cnt
, 4);
527 err
= kernel_connect(st
->sock
, (struct sockaddr
*)addr
, addrlen
, 0);
529 pohmelfs_print_addr(sa
, "kernel_connect: failed family: %d, err: %d\n", addr
->sa_family
, err
);
530 goto err_out_release
;
532 st
->sock
->sk
->sk_sndtimeo
= st
->sock
->sk
->sk_rcvtimeo
= msecs_to_jiffies(60000);
534 memcpy(&st
->sa
, sa
, sizeof(struct sockaddr_storage
));
535 st
->addrlen
= addrlen
;
537 err
= pohmelfs_poll_init(st
);
539 goto err_out_shutdown
;
542 spin_lock(&conn
->state_lock
);
544 if (!pohmelfs_addr_exist(conn
, sa
, addrlen
)) {
545 list_add_tail(&st
->state_entry
, &conn
->state_list
);
548 spin_unlock(&conn
->state_lock
);
551 goto err_out_poll_exit
;
554 err
= pohmelfs_route_request(st
);
556 goto err_out_poll_exit
;
559 pohmelfs_print_addr(sa
, "%d: connected\n", st
->conn
->idx
);
564 pohmelfs_poll_exit(st
);
566 st
->sock
->ops
->shutdown(st
->sock
, 2);
568 sock_release(st
->sock
);
572 if (err
!= -EEXIST
) {
573 pohmelfs_print_addr(sa
, "state creation failed: %d\n", err
);
578 static void pohmelfs_state_exit(struct pohmelfs_state
*st
)
583 pohmelfs_poll_exit(st
);
584 st
->sock
->ops
->shutdown(st
->sock
, 2);
586 pohmelfs_print_addr(&st
->sa
, "disconnected\n");
587 sock_release(st
->sock
);
590 static void pohmelfs_state_release(struct kref
*kref
)
592 struct pohmelfs_state
*st
= container_of(kref
, struct pohmelfs_state
, refcnt
);
593 pohmelfs_state_exit(st
);
596 void pohmelfs_state_put(struct pohmelfs_state
*st
)
598 kref_put(&st
->refcnt
, pohmelfs_state_release
);
601 static void pohmelfs_state_clean(struct pohmelfs_state
*st
)
603 struct pohmelfs_trans
*t
, *tmp
;
605 pohmelfs_route_remove_all(st
);
607 mutex_lock(&st
->trans_lock
);
608 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_entry
) {
609 list_del(&t
->trans_entry
);
611 pohmelfs_trans_put(t
);
615 struct rb_node
*n
= rb_first(&st
->trans_root
);
619 t
= rb_entry(n
, struct pohmelfs_trans
, trans_node
);
621 rb_erase(&t
->trans_node
, &st
->trans_root
);
622 pohmelfs_trans_put(t
);
624 mutex_unlock(&st
->trans_lock
);
626 cancel_work_sync(&st
->io_work
);
629 void pohmelfs_state_kill(struct pohmelfs_state
*st
)
631 BUG_ON(!list_empty(&st
->state_entry
));
633 pohmelfs_state_clean(st
);
634 pohmelfs_state_put(st
);
637 void pohmelfs_state_schedule(struct pohmelfs_state
*st
)
639 if (!st
->conn
->need_exit
)
640 queue_work(st
->conn
->wq
, &st
->io_work
);
643 int pohmelfs_state_add_reconnect(struct pohmelfs_state
*st
)
645 struct pohmelfs_connection
*conn
= st
->conn
;
646 struct pohmelfs_reconnect
*r
, *tmp
;
649 pohmelfs_route_remove_all(st
);
651 r
= kzalloc(sizeof(struct pohmelfs_reconnect
), GFP_NOIO
);
657 memcpy(&r
->sa
, &st
->sa
, sizeof(struct sockaddr_storage
));
658 r
->addrlen
= st
->addrlen
;
659 r
->group_id
= st
->group_id
;
661 mutex_lock(&conn
->reconnect_lock
);
662 list_for_each_entry(tmp
, &conn
->reconnect_list
, reconnect_entry
) {
663 if (tmp
->addrlen
!= r
->addrlen
)
666 if (memcmp(&tmp
->sa
, &r
->sa
, r
->addrlen
))
674 list_add_tail(&r
->reconnect_entry
, &conn
->reconnect_list
);
676 mutex_unlock(&conn
->reconnect_lock
);
681 pohmelfs_print_addr(&st
->sa
, "reconnection added\n");
689 spin_lock(&conn
->state_lock
);
690 list_move(&st
->state_entry
, &conn
->kill_state_list
);
691 spin_unlock(&conn
->state_lock
);
693 /* we do not really care if this work will not be processed immediately */
694 queue_delayed_work(conn
->wq
, &conn
->reconnect_work
, 0);