Fixed page sending
[pohmelfs.git] / fs / pohmelfs / net.c
blobf53203ce99c9c8702a99aa22f66e820dfc950cf1
1 /*
2 * Copyright (C) 2011+ Evgeniy Polyakov <zbr@ioremap.net>
3 */
5 #include <linux/in.h>
6 #include <linux/in6.h>
7 #include <linux/net.h>
9 #include <net/sock.h>
10 #include <net/tcp.h>
12 #include "pohmelfs.h"
14 void *pohmelfs_scratch_buf;
15 int pohmelfs_scratch_buf_size = 4096;
17 void pohmelfs_print_addr(struct sockaddr_storage *addr, const char *fmt, ...)
19 struct sockaddr *sa = (struct sockaddr *)addr;
20 va_list args;
21 char *ptr;
23 va_start(args, fmt);
24 ptr = kvasprintf(GFP_NOIO, fmt, args);
25 if (!ptr)
26 goto err_out_exit;
28 if (sa->sa_family == AF_INET) {
29 struct sockaddr_in *sin = (struct sockaddr_in *)addr;
30 pr_info("pohmelfs: %pI4:%d: %s", &sin->sin_addr.s_addr, ntohs(sin->sin_port), ptr);
31 } else if (sa->sa_family == AF_INET6) {
32 struct sockaddr_in6 *sin = (struct sockaddr_in6 *)addr;
33 pr_info("pohmelfs: %pI6:%d: %s", &sin->sin6_addr, ntohs(sin->sin6_port), ptr);
36 kfree(ptr);
37 err_out_exit:
38 va_end(args);
42 * Basic network sending/receiving functions.
43 * Blocked mode is used.
45 int pohmelfs_data_recv(struct pohmelfs_state *st, void *buf, u64 size, unsigned int flags)
47 struct msghdr msg;
48 struct kvec iov;
49 int err;
51 BUG_ON(!size);
53 iov.iov_base = buf;
54 iov.iov_len = size;
56 msg.msg_iov = (struct iovec *)&iov;
57 msg.msg_iovlen = 1;
58 msg.msg_name = NULL;
59 msg.msg_namelen = 0;
60 msg.msg_control = NULL;
61 msg.msg_controllen = 0;
62 msg.msg_flags = flags;
64 err = kernel_recvmsg(st->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);
65 if (err < 0)
66 goto err_out_exit;
68 err_out_exit:
69 return err;
72 int pohmelfs_recv(struct pohmelfs_trans *t, struct pohmelfs_state *recv, void *data, int size)
74 int err;
76 err = pohmelfs_data_recv(recv, data, size, MSG_DONTWAIT);
77 if (err < 0)
78 return err;
80 t->io_offset += err;
81 return err;
84 static int pohmelfs_data_send(struct pohmelfs_trans *t)
86 struct msghdr msg;
87 struct iovec io;
88 int err;
90 msg.msg_name = NULL;
91 msg.msg_namelen = 0;
92 msg.msg_control = NULL;
93 msg.msg_controllen = 0;
94 msg.msg_flags = MSG_DONTWAIT;
96 msg.msg_iov = &io;
97 msg.msg_iovlen = 1;
100 if (t->io_offset < t->header_size) {
101 io.iov_base = (void *)(&t->cmd) + t->io_offset;
102 io.iov_len = t->header_size - t->io_offset;
104 err = kernel_sendmsg(t->st->sock, &msg, (struct kvec *)msg.msg_iov, 1, io.iov_len);
105 if (err < 0) {
106 if (err == 0)
107 err = -ECONNRESET;
108 goto err_out_exit;
111 t->io_offset += err;
114 if ((t->io_offset >= t->header_size) && t->data) {
115 size_t sent_size = t->io_offset - t->header_size;
116 io.iov_base = t->data + sent_size;
117 io.iov_len = t->data_size - sent_size;
119 err = kernel_sendmsg(t->st->sock, &msg, (struct kvec *)msg.msg_iov, 1, io.iov_len);
120 if (err < 0) {
121 if (err == 0)
122 err = -ECONNRESET;
123 goto err_out_exit;
126 t->io_offset += err;
130 err = 0;
132 err_out_exit:
133 return err;
136 static int pohmelfs_page_send(struct pohmelfs_trans *t)
138 struct pohmelfs_write_ctl *ctl = t->wctl;
139 struct msghdr msg;
140 struct iovec io;
141 unsigned i;
142 int err = -EINVAL;
144 if (t->io_offset < t->header_size) {
145 io.iov_base = (void *)(&t->cmd) + t->io_offset;
146 io.iov_len = t->header_size - t->io_offset;
148 msg.msg_name = NULL;
149 msg.msg_namelen = 0;
150 msg.msg_control = NULL;
151 msg.msg_controllen = 0;
152 msg.msg_flags = MSG_DONTWAIT;
154 msg.msg_iov = &io;
155 msg.msg_iovlen = 1;
157 err = kernel_sendmsg(t->st->sock, &msg, (struct kvec *)msg.msg_iov, 1, io.iov_len);
158 if (err < 0) {
159 if (err == 0)
160 err = -ECONNRESET;
161 goto err_out_exit;
164 t->io_offset += err;
167 if (t->io_offset >= t->header_size) {
168 size_t skip_offset = 0;
169 size_t size = le64_to_cpu(t->cmd.cmd.size) + sizeof(struct dnet_cmd) - t->io_offset;
170 size_t current_io_offset = t->io_offset - t->header_size;
172 for (i = 0; i < pagevec_count(&ctl->pvec); ++i) {
173 struct page *page = ctl->pvec.pages[i];
174 size_t sz = PAGE_CACHE_SIZE;
176 if (sz > size)
177 sz = size;
179 if (current_io_offset > skip_offset + sz) {
180 skip_offset += sz;
181 continue;
184 sz -= current_io_offset - skip_offset;
186 err = kernel_sendpage(t->st->sock, page, current_io_offset - skip_offset, sz, MSG_DONTWAIT);
188 pr_debug("pohmelfs: %s: %d/%d: total-size: %llu, io-offset: %llu, rest-size: %zd, current-io: %zd, "
189 "skip-offset: %zd, sz: %zu: %d\n",
190 pohmelfs_dump_id(pohmelfs_inode(t->inode)->id.id), i, pagevec_count(&ctl->pvec),
191 (unsigned long long)le64_to_cpu(t->cmd.cmd.size) + sizeof(struct dnet_cmd),
192 t->io_offset, size, current_io_offset, skip_offset, sz, err);
194 if (err <= 0) {
195 if (err == 0)
196 err = -ECONNRESET;
197 goto err_out_exit;
200 current_io_offset += err;
201 skip_offset = current_io_offset;
202 size -= err;
203 t->io_offset += err;
205 err = 0;
209 err_out_exit:
210 return err;
214 * Polling machinery.
217 struct pohmelfs_poll_helper {
218 poll_table pt;
219 struct pohmelfs_state *st;
222 static int pohmelfs_queue_wake(wait_queue_t *wait, unsigned mode, int sync, void *key)
224 struct pohmelfs_state *st = container_of(wait, struct pohmelfs_state, wait);
226 if (!st->conn->need_exit)
227 queue_work(st->conn->wq, &st->io_work);
228 return 0;
231 static void pohmelfs_queue_func(struct file *file, wait_queue_head_t *whead, poll_table *pt)
233 struct pohmelfs_state *st = container_of(pt, struct pohmelfs_poll_helper, pt)->st;
235 st->whead = whead;
237 init_waitqueue_func_entry(&st->wait, pohmelfs_queue_wake);
238 add_wait_queue(whead, &st->wait);
241 static void pohmelfs_poll_exit(struct pohmelfs_state *st)
243 if (st->whead) {
244 remove_wait_queue(st->whead, &st->wait);
245 st->whead = NULL;
249 static int pohmelfs_poll_init(struct pohmelfs_state *st)
251 struct pohmelfs_poll_helper ph;
253 ph.st = st;
254 init_poll_funcptr(&ph.pt, &pohmelfs_queue_func);
256 st->sock->ops->poll(NULL, st->sock, &ph.pt);
257 return 0;
260 static int pohmelfs_revents(struct pohmelfs_state *st, unsigned mask)
262 unsigned revents;
264 revents = st->sock->ops->poll(NULL, st->sock, NULL);
265 if (revents & mask)
266 return 0;
268 if (revents & (POLLERR | POLLHUP | POLLNVAL | POLLRDHUP | POLLREMOVE)) {
269 pohmelfs_print_addr(&st->sa, "error revents: %x\n", revents);
270 return -ECONNRESET;
273 return -EAGAIN;
276 static int pohmelfs_state_send(struct pohmelfs_state *st)
278 struct pohmelfs_trans *t = NULL;
279 int trans_put = 0;
280 size_t size;
281 int err = -EAGAIN;
283 mutex_lock(&st->trans_lock);
284 if (!list_empty(&st->trans_list))
285 t = list_first_entry(&st->trans_list, struct pohmelfs_trans, trans_entry);
286 mutex_unlock(&st->trans_lock);
288 if (!t)
289 goto err_out_exit;
291 err = pohmelfs_revents(st, POLLOUT);
292 if (err)
293 goto err_out_exit;
295 size = le64_to_cpu(t->cmd.cmd.size) + sizeof(struct dnet_cmd);
296 pr_debug("pohmelfs: %s: starting sending: %llu/%zd\n", pohmelfs_dump_id(pohmelfs_inode(t->inode)->id.id), t->io_offset, size);
298 if (t->wctl)
299 err = pohmelfs_page_send(t);
300 else
301 err = pohmelfs_data_send(t);
303 pr_debug("pohmelfs: %s: sent: %llu/%zd: %d\n", pohmelfs_dump_id(pohmelfs_inode(t->inode)->id.id), t->io_offset, size, err);
304 if (!err && (t->io_offset == size)) {
305 mutex_lock(&st->trans_lock);
306 list_del_init(&t->trans_entry);
307 err = pohmelfs_trans_insert_tree(st, t);
308 if (err)
309 trans_put = 1;
310 t->io_offset = 0;
311 mutex_unlock(&st->trans_lock);
314 BUG_ON(t->io_offset > size);
316 if (trans_put)
317 pohmelfs_trans_put(t);
319 if ((err < 0) && (err != -EAGAIN))
320 goto err_out_exit;
322 err_out_exit:
323 return err;
326 static void pohmelfs_suck_scratch(struct pohmelfs_state *st)
328 struct dnet_cmd *cmd = &st->cmd;
329 int err = 0;
331 pr_debug("pohmelfs_suck_scratch: %llu\n", (unsigned long long)cmd->size);
333 while (cmd->size) {
334 int sz = pohmelfs_scratch_buf_size;
336 if (cmd->size < sz)
337 sz = cmd->size;
339 err = pohmelfs_data_recv(st, pohmelfs_scratch_buf, sz, MSG_WAITALL);
340 if (err < 0) {
341 pohmelfs_print_addr(&st->sa, "recv-scratch err: %d\n", err);
342 goto err_out_exit;
345 cmd->size -= err;
348 err_out_exit:
349 st->cmd_read = 1;
352 static int pohmelfs_state_recv(struct pohmelfs_state *st)
354 struct dnet_cmd *cmd = &st->cmd;
355 struct pohmelfs_trans *t;
356 unsigned long long trans;
357 int err;
359 err = pohmelfs_revents(st, POLLIN);
360 if (err)
361 goto err_out_exit;
363 if (st->cmd_read) {
364 err = pohmelfs_data_recv(st, cmd, sizeof(struct dnet_cmd), MSG_WAITALL);
365 if (err <= 0) {
366 if (err == 0)
367 err = -ECONNRESET;
369 pohmelfs_print_addr(&st->sa, "recv error: %d\n", err);
370 goto err_out_exit;
373 dnet_convert_cmd(cmd);
375 trans = cmd->trans & ~DNET_TRANS_REPLY;
376 st->cmd_read = 0;
379 t = pohmelfs_trans_lookup(st, cmd);
380 if (!t) {
381 pohmelfs_suck_scratch(st);
383 err = 0;
384 goto err_out_exit;
386 if (cmd->size && (t->io_offset != cmd->size)) {
387 err = t->cb.recv_reply(t, st);
388 if (err && (err != -EAGAIN)) {
389 pohmelfs_print_addr(&st->sa, "recv-reply error: %d\n", err);
390 goto err_out_remove;
393 if (t->io_offset != cmd->size)
394 goto err_out_put;
397 err = t->cb.complete(t, st);
398 if (err) {
399 pohmelfs_print_addr(&st->sa, "recv-complete err: %d\n", err);
402 kfree(t->recv_data);
403 t->recv_data = NULL;
404 t->io_offset = 0;
406 err_out_remove:
407 /* only remove and free transaction if there is error or there will be no more replies */
408 if (!(cmd->flags & DNET_FLAGS_MORE) || err) {
409 pohmelfs_trans_remove(t);
412 * refcnt was grabbed twice:
413 * in pohmelfs_trans_lookup()
414 * and at transaction creation
416 pohmelfs_trans_put(t);
418 st->cmd_read = 1;
419 if (err) {
420 cmd->size -= t->io_offset;
421 t->io_offset = 0;
424 err_out_put:
425 pohmelfs_trans_put(t);
426 err_out_exit:
427 return err;
430 static void pohmelfs_state_io_work(struct work_struct *work)
432 struct pohmelfs_state *st = container_of(work, struct pohmelfs_state, io_work);
433 int send_err, recv_err;
435 send_err = recv_err = -EAGAIN;
436 while (!st->conn->psb->need_exit) {
437 send_err = pohmelfs_state_send(st);
438 if (send_err && (send_err != -EAGAIN)) {
439 pohmelfs_print_addr(&st->sa, "state send error: %d\n", send_err);
440 goto err_out_exit;
443 recv_err = pohmelfs_state_recv(st);
444 if (recv_err && (recv_err != -EAGAIN)) {
445 pohmelfs_print_addr(&st->sa, "state recv error: %d\n", recv_err);
446 goto err_out_exit;
449 if ((send_err == -EAGAIN) && (recv_err == -EAGAIN))
450 break;
453 err_out_exit:
454 if ((send_err && (send_err != -EAGAIN)) || (recv_err && (recv_err != -EAGAIN))) {
455 pohmelfs_state_add_reconnect(st);
457 return;
460 struct pohmelfs_state *pohmelfs_addr_exist(struct pohmelfs_connection *conn, struct sockaddr_storage *sa, int addrlen)
462 struct pohmelfs_state *st;
464 list_for_each_entry(st, &conn->state_list, state_entry) {
465 if (st->addrlen != addrlen)
466 continue;
468 if (!memcmp(&st->sa, sa, addrlen)) {
469 return st;
473 return 0;
476 struct pohmelfs_state *pohmelfs_state_create(struct pohmelfs_connection *conn, struct sockaddr_storage *sa, int addrlen,
477 int ask_route, int group_id)
479 int err = 0;
480 struct pohmelfs_state *st;
481 struct sockaddr *addr = (struct sockaddr *)sa;
483 /* early check - this state can be inserted into route table, no need to create state and check again */
484 spin_lock(&conn->state_lock);
485 if (pohmelfs_addr_exist(conn, sa, addrlen))
486 err = -EEXIST;
487 spin_unlock(&conn->state_lock);
489 if (err)
490 goto err_out_exit;
492 st = kzalloc(sizeof(struct pohmelfs_state), GFP_KERNEL);
493 if (!st) {
494 err = -ENOMEM;
495 goto err_out_exit;
498 st->conn = conn;
499 mutex_init(&st->trans_lock);
500 INIT_LIST_HEAD(&st->trans_list);
501 st->trans_root = RB_ROOT;
503 st->group_id = group_id;
505 kref_init(&st->refcnt);
507 INIT_WORK(&st->io_work, pohmelfs_state_io_work);
509 st->cmd_read = 1;
511 err = sock_create_kern(addr->sa_family, SOCK_STREAM, IPPROTO_TCP, &st->sock);
512 if (err) {
513 pohmelfs_print_addr(sa, "sock_create: failed family: %d, err: %d\n", addr->sa_family, err);
514 goto err_out_free;
517 st->sock->sk->sk_allocation = GFP_NOIO;
518 st->sock->sk->sk_sndtimeo = st->sock->sk->sk_rcvtimeo = msecs_to_jiffies(60000);
520 err = 1;
521 sock_setsockopt(st->sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&err, 4);
523 tcp_setsockopt(st->sock->sk, SOL_TCP, TCP_KEEPIDLE, (char *)&conn->psb->keepalive_idle, 4);
524 tcp_setsockopt(st->sock->sk, SOL_TCP, TCP_KEEPINTVL, (char *)&conn->psb->keepalive_interval, 4);
525 tcp_setsockopt(st->sock->sk, SOL_TCP, TCP_KEEPCNT, (char *)&conn->psb->keepalive_cnt, 4);
527 err = kernel_connect(st->sock, (struct sockaddr *)addr, addrlen, 0);
528 if (err) {
529 pohmelfs_print_addr(sa, "kernel_connect: failed family: %d, err: %d\n", addr->sa_family, err);
530 goto err_out_release;
532 st->sock->sk->sk_sndtimeo = st->sock->sk->sk_rcvtimeo = msecs_to_jiffies(60000);
534 memcpy(&st->sa, sa, sizeof(struct sockaddr_storage));
535 st->addrlen = addrlen;
537 err = pohmelfs_poll_init(st);
538 if (err)
539 goto err_out_shutdown;
542 spin_lock(&conn->state_lock);
543 err = -EEXIST;
544 if (!pohmelfs_addr_exist(conn, sa, addrlen)) {
545 list_add_tail(&st->state_entry, &conn->state_list);
546 err = 0;
548 spin_unlock(&conn->state_lock);
550 if (err)
551 goto err_out_poll_exit;
553 if (ask_route) {
554 err = pohmelfs_route_request(st);
555 if (err)
556 goto err_out_poll_exit;
559 pohmelfs_print_addr(sa, "%d: connected\n", st->conn->idx);
561 return st;
563 err_out_poll_exit:
564 pohmelfs_poll_exit(st);
565 err_out_shutdown:
566 st->sock->ops->shutdown(st->sock, 2);
567 err_out_release:
568 sock_release(st->sock);
569 err_out_free:
570 kfree(st);
571 err_out_exit:
572 if (err != -EEXIST) {
573 pohmelfs_print_addr(sa, "state creation failed: %d\n", err);
575 return ERR_PTR(err);
578 static void pohmelfs_state_exit(struct pohmelfs_state *st)
580 if (!st->sock)
581 return;
583 pohmelfs_poll_exit(st);
584 st->sock->ops->shutdown(st->sock, 2);
586 pohmelfs_print_addr(&st->sa, "disconnected\n");
587 sock_release(st->sock);
590 static void pohmelfs_state_release(struct kref *kref)
592 struct pohmelfs_state *st = container_of(kref, struct pohmelfs_state, refcnt);
593 pohmelfs_state_exit(st);
596 void pohmelfs_state_put(struct pohmelfs_state *st)
598 kref_put(&st->refcnt, pohmelfs_state_release);
601 static void pohmelfs_state_clean(struct pohmelfs_state *st)
603 struct pohmelfs_trans *t, *tmp;
605 pohmelfs_route_remove_all(st);
607 mutex_lock(&st->trans_lock);
608 list_for_each_entry_safe(t, tmp, &st->trans_list, trans_entry) {
609 list_del(&t->trans_entry);
611 pohmelfs_trans_put(t);
614 while (1) {
615 struct rb_node *n = rb_first(&st->trans_root);
616 if (!n)
617 break;
619 t = rb_entry(n, struct pohmelfs_trans, trans_node);
621 rb_erase(&t->trans_node, &st->trans_root);
622 pohmelfs_trans_put(t);
624 mutex_unlock(&st->trans_lock);
626 cancel_work_sync(&st->io_work);
629 void pohmelfs_state_kill(struct pohmelfs_state *st)
631 BUG_ON(!list_empty(&st->state_entry));
633 pohmelfs_state_clean(st);
634 pohmelfs_state_put(st);
637 void pohmelfs_state_schedule(struct pohmelfs_state *st)
639 if (!st->conn->need_exit)
640 queue_work(st->conn->wq, &st->io_work);
643 int pohmelfs_state_add_reconnect(struct pohmelfs_state *st)
645 struct pohmelfs_connection *conn = st->conn;
646 struct pohmelfs_reconnect *r, *tmp;
647 int err = 0;
649 pohmelfs_route_remove_all(st);
651 r = kzalloc(sizeof(struct pohmelfs_reconnect), GFP_NOIO);
652 if (!r) {
653 err = -ENOMEM;
654 goto err_out_exit;
657 memcpy(&r->sa, &st->sa, sizeof(struct sockaddr_storage));
658 r->addrlen = st->addrlen;
659 r->group_id = st->group_id;
661 mutex_lock(&conn->reconnect_lock);
662 list_for_each_entry(tmp, &conn->reconnect_list, reconnect_entry) {
663 if (tmp->addrlen != r->addrlen)
664 continue;
666 if (memcmp(&tmp->sa, &r->sa, r->addrlen))
667 continue;
669 err = -EEXIST;
670 break;
673 if (!err) {
674 list_add_tail(&r->reconnect_entry, &conn->reconnect_list);
676 mutex_unlock(&conn->reconnect_lock);
678 if (err)
679 goto err_out_free;
681 pohmelfs_print_addr(&st->sa, "reconnection added\n");
682 err = 0;
683 goto err_out_exit;
685 err_out_free:
686 kfree(r);
687 err_out_exit:
689 spin_lock(&conn->state_lock);
690 list_move(&st->state_entry, &conn->kill_state_list);
691 spin_unlock(&conn->state_lock);
693 /* we do not really care if this work will not be processed immediately */
694 queue_delayed_work(conn->wq, &conn->reconnect_work, 0);
696 return err;