2 * Copyright (C) 2011+ Evgeniy Polyakov <zbr@ioremap.net>
14 void *pohmelfs_scratch_buf
;
15 int pohmelfs_scratch_buf_size
= 4096;
17 void pohmelfs_print_addr(struct sockaddr_storage
*addr
, const char *fmt
, ...)
19 struct sockaddr
*sa
= (struct sockaddr
*)addr
;
24 ptr
= kvasprintf(GFP_NOIO
, fmt
, args
);
28 if (sa
->sa_family
== AF_INET
) {
29 struct sockaddr_in
*sin
= (struct sockaddr_in
*)addr
;
30 pr_info("pohmelfs: %pI4:%d: %s", &sin
->sin_addr
.s_addr
, ntohs(sin
->sin_port
), ptr
);
31 } else if (sa
->sa_family
== AF_INET6
) {
32 struct sockaddr_in6
*sin
= (struct sockaddr_in6
*)addr
;
33 pr_info("pohmelfs: %pI6:%d: %s", &sin
->sin6_addr
, ntohs(sin
->sin6_port
), ptr
);
42 * Basic network sending/receiving functions.
43 * Blocked mode is used.
45 int pohmelfs_data_recv(struct pohmelfs_state
*st
, void *buf
, u64 size
, unsigned int flags
)
56 msg
.msg_iov
= (struct iovec
*)&iov
;
60 msg
.msg_control
= NULL
;
61 msg
.msg_controllen
= 0;
62 msg
.msg_flags
= flags
;
64 err
= kernel_recvmsg(st
->sock
, &msg
, &iov
, 1, iov
.iov_len
, msg
.msg_flags
);
75 int pohmelfs_recv(struct pohmelfs_trans
*t
, struct pohmelfs_state
*recv
, void *data
, int size
)
79 err
= pohmelfs_data_recv(recv
, data
, size
, MSG_DONTWAIT
);
83 t
->recv_offset
+= err
;
87 static int pohmelfs_data_send(struct pohmelfs_trans
*t
)
93 io
[0].iov_base
= &t
->cmd
;
94 io
[0].iov_len
= t
->header_size
;
97 io
[1].iov_base
= t
->data
;
98 io
[1].iov_len
= t
->data_size
;
104 msg
.msg_control
= NULL
;
105 msg
.msg_controllen
= 0;
106 msg
.msg_flags
= MSG_WAITALL
;
109 msg
.msg_iovlen
= ionum
;
111 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, ionum
, t
->data_size
+ t
->header_size
);
124 static int pohmelfs_page_send(struct pohmelfs_trans
*t
)
126 struct pohmelfs_write_ctl
*ctl
= t
->wctl
;
127 size_t size
= le64_to_cpu(t
->cmd
.p
.io
.size
);
128 pgoff_t offset
= le64_to_cpu(t
->cmd
.p
.io
.offset
);
134 io
.iov_base
= &t
->cmd
;
135 io
.iov_len
= t
->header_size
;
139 msg
.msg_control
= NULL
;
140 msg
.msg_controllen
= 0;
141 msg
.msg_flags
= MSG_WAITALL
;
146 err
= kernel_sendmsg(t
->st
->sock
, &msg
, (struct kvec
*)msg
.msg_iov
, 1, t
->header_size
);
153 for (i
= 0; i
< pagevec_count(&ctl
->pvec
); ++i
) {
154 struct page
*page
= ctl
->pvec
.pages
[i
];
155 pgoff_t off
= offset
& (PAGE_CACHE_SIZE
- 1);
156 size_t sz
= PAGE_CACHE_SIZE
- off
;
161 err
= kernel_sendpage(t
->st
->sock
, page
, off
, sz
, msg
.msg_flags
);
185 struct pohmelfs_poll_helper
{
187 struct pohmelfs_state
*st
;
190 static int pohmelfs_queue_wake(wait_queue_t
*wait
, unsigned mode
, int sync
, void *key
)
192 struct pohmelfs_state
*st
= container_of(wait
, struct pohmelfs_state
, wait
);
194 if (!st
->conn
->need_exit
)
195 queue_work(st
->conn
->wq
, &st
->recv_work
);
199 static void pohmelfs_queue_func(struct file
*file
, wait_queue_head_t
*whead
, poll_table
*pt
)
201 struct pohmelfs_state
*st
= container_of(pt
, struct pohmelfs_poll_helper
, pt
)->st
;
205 init_waitqueue_func_entry(&st
->wait
, pohmelfs_queue_wake
);
206 add_wait_queue(whead
, &st
->wait
);
209 static void pohmelfs_poll_exit(struct pohmelfs_state
*st
)
212 remove_wait_queue(st
->whead
, &st
->wait
);
217 static int pohmelfs_poll_init(struct pohmelfs_state
*st
)
219 struct pohmelfs_poll_helper ph
;
222 init_poll_funcptr(&ph
.pt
, &pohmelfs_queue_func
);
224 st
->sock
->ops
->poll(NULL
, st
->sock
, &ph
.pt
);
228 static void pohmelfs_state_send_work(struct work_struct
*work
)
230 struct pohmelfs_state
*st
= container_of(work
, struct pohmelfs_state
, send_work
);
231 struct pohmelfs_trans
*t
;
235 while (!st
->conn
->need_exit
) {
239 mutex_lock(&st
->trans_lock
);
240 if (!list_empty(&st
->trans_list
)) {
241 t
= list_first_entry(&st
->trans_list
, struct pohmelfs_trans
, trans_entry
);
242 list_del_init(&t
->trans_entry
);
243 err
= pohmelfs_trans_insert_tree(st
, t
);
247 mutex_unlock(&st
->trans_lock
);
253 err
= pohmelfs_page_send(t
);
255 err
= pohmelfs_data_send(t
);
258 pohmelfs_trans_put(t
);
261 pohmelfs_print_addr(&st
->sa
, "send error: %d\n", err
);
263 pohmelfs_state_add_reconnect(st
);
269 static void pohmelfs_suck_scratch(struct pohmelfs_state
*st
)
271 struct dnet_cmd
*cmd
= &st
->cmd
;
274 pr_debug("pohmelfs_suck_scratch: %llu\n", (unsigned long long)cmd
->size
);
277 int sz
= pohmelfs_scratch_buf_size
;
282 err
= pohmelfs_data_recv(st
, pohmelfs_scratch_buf
, sz
, MSG_WAITALL
);
284 pohmelfs_print_addr(&st
->sa
, "recv-scratch err: %d\n", err
);
295 static void pohmelfs_state_recv_work(struct work_struct
*work
)
297 struct pohmelfs_state
*st
= container_of(work
, struct pohmelfs_state
, recv_work
);
298 struct dnet_cmd
*cmd
= &st
->cmd
;
299 struct pohmelfs_trans
*t
;
300 unsigned long long trans
;
301 unsigned int revents
;
304 while (!st
->conn
->need_exit
) {
305 revents
= st
->sock
->ops
->poll(NULL
, st
->sock
, NULL
);
306 if (!(revents
& POLLIN
))
310 err
= pohmelfs_data_recv(st
, cmd
, sizeof(struct dnet_cmd
), MSG_WAITALL
);
312 pohmelfs_print_addr(&st
->sa
, "recv error: %d\n", err
);
316 dnet_convert_cmd(cmd
);
318 trans
= cmd
->trans
& ~DNET_TRANS_REPLY
;
322 t
= pohmelfs_trans_lookup(st
, cmd
);
324 pohmelfs_suck_scratch(st
);
327 goto err_out_continue
;
329 if (cmd
->size
&& (t
->recv_offset
!= cmd
->size
)) {
330 err
= t
->cb
.recv_reply(t
, st
);
331 if (err
&& (err
!= -EAGAIN
)) {
332 pohmelfs_print_addr(&st
->sa
, "recv-reply error: %d\n", err
);
336 if (t
->recv_offset
!= cmd
->size
)
337 goto err_out_continue_put
;
340 err
= t
->cb
.complete(t
, st
);
342 pohmelfs_print_addr(&st
->sa
, "recv-complete err: %d\n", err
);
350 /* only remove and free transaction if there is error or there will be no more replies */
351 if (!(cmd
->flags
& DNET_FLAGS_MORE
) || err
) {
352 pohmelfs_trans_remove(t
);
355 * refcnt was grabbed twice:
356 * in pohmelfs_trans_lookup()
357 * and at transaction creation
359 pohmelfs_trans_put(t
);
363 cmd
->size
-= t
->recv_offset
;
366 err_out_continue_put
:
367 pohmelfs_trans_put(t
);
369 if (err
&& (err
!= -EAGAIN
)) {
370 //pohmelfs_suck_scratch(st);
378 if (err
&& err
!= -EAGAIN
)
379 pohmelfs_state_add_reconnect(st
);
383 struct pohmelfs_state
*pohmelfs_addr_exist(struct pohmelfs_connection
*conn
, struct sockaddr_storage
*sa
, int addrlen
)
385 struct pohmelfs_state
*st
;
387 list_for_each_entry(st
, &conn
->state_list
, state_entry
) {
388 if (st
->addrlen
!= addrlen
)
391 if (!memcmp(&st
->sa
, sa
, addrlen
)) {
399 struct pohmelfs_state
*pohmelfs_state_create(struct pohmelfs_connection
*conn
, struct sockaddr_storage
*sa
, int addrlen
,
400 int ask_route
, int group_id
)
403 struct pohmelfs_state
*st
;
404 struct sockaddr
*addr
= (struct sockaddr
*)sa
;
406 /* early check - this state can be inserted into route table, no need to create state and check again */
407 spin_lock(&conn
->state_lock
);
408 if (pohmelfs_addr_exist(conn
, sa
, addrlen
))
410 spin_unlock(&conn
->state_lock
);
415 st
= kzalloc(sizeof(struct pohmelfs_state
), GFP_KERNEL
);
422 mutex_init(&st
->trans_lock
);
423 INIT_LIST_HEAD(&st
->trans_list
);
424 st
->trans_root
= RB_ROOT
;
426 st
->group_id
= group_id
;
428 kref_init(&st
->refcnt
);
430 INIT_WORK(&st
->send_work
, pohmelfs_state_send_work
);
431 INIT_WORK(&st
->recv_work
, pohmelfs_state_recv_work
);
435 err
= sock_create_kern(addr
->sa_family
, SOCK_STREAM
, IPPROTO_TCP
, &st
->sock
);
437 pohmelfs_print_addr(sa
, "sock_create: failed family: %d, err: %d\n", addr
->sa_family
, err
);
441 st
->sock
->sk
->sk_allocation
= GFP_NOIO
;
442 st
->sock
->sk
->sk_sndtimeo
= st
->sock
->sk
->sk_rcvtimeo
= msecs_to_jiffies(60000);
445 sock_setsockopt(st
->sock
, SOL_SOCKET
, SO_KEEPALIVE
, (char *)&err
, 4);
447 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPIDLE
, (char *)&conn
->psb
->keepalive_idle
, 4);
448 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPINTVL
, (char *)&conn
->psb
->keepalive_interval
, 4);
449 tcp_setsockopt(st
->sock
->sk
, SOL_TCP
, TCP_KEEPCNT
, (char *)&conn
->psb
->keepalive_cnt
, 4);
451 err
= kernel_connect(st
->sock
, (struct sockaddr
*)addr
, addrlen
, 0);
453 pohmelfs_print_addr(sa
, "kernel_connect: failed family: %d, err: %d\n", addr
->sa_family
, err
);
454 goto err_out_release
;
456 st
->sock
->sk
->sk_sndtimeo
= st
->sock
->sk
->sk_rcvtimeo
= msecs_to_jiffies(60000);
458 memcpy(&st
->sa
, sa
, sizeof(struct sockaddr_storage
));
459 st
->addrlen
= addrlen
;
461 err
= pohmelfs_poll_init(st
);
463 goto err_out_shutdown
;
466 spin_lock(&conn
->state_lock
);
468 if (!pohmelfs_addr_exist(conn
, sa
, addrlen
)) {
469 list_add_tail(&st
->state_entry
, &conn
->state_list
);
472 spin_unlock(&conn
->state_lock
);
475 goto err_out_poll_exit
;
478 err
= pohmelfs_route_request(st
);
480 goto err_out_poll_exit
;
483 pohmelfs_print_addr(sa
, "%d: connected\n", st
->conn
->idx
);
488 pohmelfs_poll_exit(st
);
490 st
->sock
->ops
->shutdown(st
->sock
, 2);
492 sock_release(st
->sock
);
496 if (err
!= -EEXIST
) {
497 pohmelfs_print_addr(sa
, "state creation failed: %d\n", err
);
502 static void pohmelfs_state_exit(struct pohmelfs_state
*st
)
507 pohmelfs_poll_exit(st
);
508 st
->sock
->ops
->shutdown(st
->sock
, 2);
510 pohmelfs_print_addr(&st
->sa
, "disconnected\n");
511 sock_release(st
->sock
);
514 static void pohmelfs_state_release(struct kref
*kref
)
516 struct pohmelfs_state
*st
= container_of(kref
, struct pohmelfs_state
, refcnt
);
517 pohmelfs_state_exit(st
);
520 void pohmelfs_state_put(struct pohmelfs_state
*st
)
522 kref_put(&st
->refcnt
, pohmelfs_state_release
);
525 static void pohmelfs_state_clean(struct pohmelfs_state
*st
)
527 struct pohmelfs_trans
*t
, *tmp
;
529 pohmelfs_route_remove_all(st
);
531 mutex_lock(&st
->trans_lock
);
532 list_for_each_entry_safe(t
, tmp
, &st
->trans_list
, trans_entry
) {
533 list_del(&t
->trans_entry
);
534 pohmelfs_trans_put(t
);
538 struct rb_node
*n
= rb_first(&st
->trans_root
);
542 t
= rb_entry(n
, struct pohmelfs_trans
, trans_node
);
543 pohmelfs_trans_put(t
);
545 mutex_unlock(&st
->trans_lock
);
547 cancel_work_sync(&st
->send_work
);
548 cancel_work_sync(&st
->recv_work
);
551 void pohmelfs_state_kill(struct pohmelfs_state
*st
)
553 BUG_ON(!list_empty(&st
->state_entry
));
555 pohmelfs_state_clean(st
);
556 pohmelfs_state_put(st
);
559 void pohmelfs_state_schedule(struct pohmelfs_state
*st
)
561 if (!st
->conn
->need_exit
)
562 queue_work(st
->conn
->wq
, &st
->send_work
);
565 int pohmelfs_state_add_reconnect(struct pohmelfs_state
*st
)
567 struct pohmelfs_connection
*conn
= st
->conn
;
568 struct pohmelfs_reconnect
*r
, *tmp
;
571 pohmelfs_route_remove_all(st
);
574 * Remove state from route table
576 spin_lock(&conn
->state_lock
);
577 list_move(&st
->state_entry
, &conn
->kill_state_list
);
578 spin_unlock(&conn
->state_lock
);
580 r
= kzalloc(sizeof(struct pohmelfs_reconnect
), GFP_NOIO
);
586 memcpy(&r
->sa
, &st
->sa
, sizeof(struct sockaddr_storage
));
587 r
->addrlen
= st
->addrlen
;
588 r
->group_id
= st
->group_id
;
590 mutex_lock(&conn
->reconnect_lock
);
591 list_for_each_entry(tmp
, &conn
->reconnect_list
, reconnect_entry
) {
592 if (tmp
->addrlen
!= r
->addrlen
)
595 if (memcmp(&tmp
->sa
, &r
->sa
, r
->addrlen
))
603 list_add_tail(&r
->reconnect_entry
, &conn
->reconnect_list
);
605 mutex_unlock(&conn
->reconnect_lock
);
610 /* we do not really care if this work will not be processed immediately */
611 queue_delayed_work(conn
->wq
, &conn
->reconnect_work
, 0);
613 pohmelfs_print_addr(&st
->sa
, "reconnection added\n");