After transaction was sent, move into per-state tree not list. This speeds up reply...
[pohmelfs.git] / fs / pohmelfs / net.c
blob47d7aa6e30699308610142ae9cc7ab798d01a65a
1 /*
2 * Copyright (C) 2011+ Evgeniy Polyakov <zbr@ioremap.net>
3 */
5 #include <linux/in.h>
6 #include <linux/in6.h>
7 #include <linux/net.h>
9 #include <net/sock.h>
10 #include <net/tcp.h>
12 #include "pohmelfs.h"
14 void *pohmelfs_scratch_buf;
15 int pohmelfs_scratch_buf_size = 4096;
17 void pohmelfs_print_addr(struct sockaddr_storage *addr, const char *fmt, ...)
19 struct sockaddr *sa = (struct sockaddr *)addr;
20 va_list args;
21 char *ptr;
23 va_start(args, fmt);
24 ptr = kvasprintf(GFP_NOIO, fmt, args);
25 if (!ptr)
26 goto err_out_exit;
28 if (sa->sa_family == AF_INET) {
29 struct sockaddr_in *sin = (struct sockaddr_in *)addr;
30 pr_info("pohmelfs: %pI4:%d: %s", &sin->sin_addr.s_addr, ntohs(sin->sin_port), ptr);
31 } else if (sa->sa_family == AF_INET6) {
32 struct sockaddr_in6 *sin = (struct sockaddr_in6 *)addr;
33 pr_info("pohmelfs: %pI6:%d: %s", &sin->sin6_addr, ntohs(sin->sin6_port), ptr);
36 kfree(ptr);
37 err_out_exit:
38 va_end(args);
42 * Basic network sending/receiving functions.
43 * Blocked mode is used.
45 int pohmelfs_data_recv(struct pohmelfs_state *st, void *buf, u64 size, unsigned int flags)
47 struct msghdr msg;
48 struct kvec iov;
49 int err;
51 BUG_ON(!size);
53 iov.iov_base = buf;
54 iov.iov_len = size;
56 msg.msg_iov = (struct iovec *)&iov;
57 msg.msg_iovlen = 1;
58 msg.msg_name = NULL;
59 msg.msg_namelen = 0;
60 msg.msg_control = NULL;
61 msg.msg_controllen = 0;
62 msg.msg_flags = flags;
64 err = kernel_recvmsg(st->sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);
65 if (err <= 0) {
66 if (err == 0)
67 err = -ECONNRESET;
68 goto err_out_exit;
71 err_out_exit:
72 return err;
75 int pohmelfs_recv(struct pohmelfs_trans *t, struct pohmelfs_state *recv, void *data, int size)
77 int err;
79 err = pohmelfs_data_recv(recv, data, size, MSG_DONTWAIT);
80 if (err < 0)
81 return err;
83 t->recv_offset += err;
84 return err;
87 static int pohmelfs_data_send(struct pohmelfs_trans *t)
89 struct msghdr msg;
90 struct iovec io[2];
91 int err, ionum = 1;
93 io[0].iov_base = &t->cmd;
94 io[0].iov_len = t->header_size;
96 if (t->data) {
97 io[1].iov_base = t->data;
98 io[1].iov_len = t->data_size;
99 ionum = 2;
102 msg.msg_name = NULL;
103 msg.msg_namelen = 0;
104 msg.msg_control = NULL;
105 msg.msg_controllen = 0;
106 msg.msg_flags = MSG_WAITALL;
108 msg.msg_iov = io;
109 msg.msg_iovlen = ionum;
111 err = kernel_sendmsg(t->st->sock, &msg, (struct kvec *)msg.msg_iov, ionum, t->data_size + t->header_size);
112 if (err <= 0) {
113 if (err == 0)
114 err = -ECONNRESET;
115 goto err_out_exit;
118 err = 0;
120 err_out_exit:
121 return err;
124 static int pohmelfs_page_send(struct pohmelfs_trans *t)
126 struct pohmelfs_write_ctl *ctl = t->wctl;
127 size_t size = le64_to_cpu(t->cmd.p.io.size);
128 pgoff_t offset = le64_to_cpu(t->cmd.p.io.offset);
129 struct msghdr msg;
130 struct iovec io;
131 unsigned i;
132 int err;
134 io.iov_base = &t->cmd;
135 io.iov_len = t->header_size;
137 msg.msg_name = NULL;
138 msg.msg_namelen = 0;
139 msg.msg_control = NULL;
140 msg.msg_controllen = 0;
141 msg.msg_flags = MSG_WAITALL;
143 msg.msg_iov = &io;
144 msg.msg_iovlen = 1;
146 err = kernel_sendmsg(t->st->sock, &msg, (struct kvec *)msg.msg_iov, 1, t->header_size);
147 if (err <= 0) {
148 if (err == 0)
149 err = -ECONNRESET;
150 goto err_out_exit;
153 for (i = 0; i< pagevec_count(&ctl->pvec); ++i) {
154 struct page *page = ctl->pvec.pages[i];
155 pgoff_t off = offset & (PAGE_CACHE_SIZE - 1);
156 size_t sz = PAGE_CACHE_SIZE - off;
158 if (sz > size)
159 sz = size;
161 err = kernel_sendpage(t->st->sock, page, off, sz, msg.msg_flags);
162 if (err <= 0) {
163 if (err == 0)
164 err = -ECONNRESET;
166 goto err_out_reset;
169 size -= err;
170 offset += err;
174 return 0;
176 err_out_reset:
177 err_out_exit:
178 return err;
182 * Polling machinery.
185 struct pohmelfs_poll_helper {
186 poll_table pt;
187 struct pohmelfs_state *st;
190 static int pohmelfs_queue_wake(wait_queue_t *wait, unsigned mode, int sync, void *key)
192 struct pohmelfs_state *st = container_of(wait, struct pohmelfs_state, wait);
194 queue_work(st->psb->wq, &st->recv_work);
195 return 1;
198 static void pohmelfs_queue_func(struct file *file, wait_queue_head_t *whead, poll_table *pt)
200 struct pohmelfs_state *st = container_of(pt, struct pohmelfs_poll_helper, pt)->st;
202 st->whead = whead;
204 init_waitqueue_func_entry(&st->wait, pohmelfs_queue_wake);
205 add_wait_queue(whead, &st->wait);
208 static void pohmelfs_poll_exit(struct pohmelfs_state *st)
210 if (st->whead) {
211 remove_wait_queue(st->whead, &st->wait);
212 st->whead = NULL;
216 static int pohmelfs_poll_init(struct pohmelfs_state *st)
218 struct pohmelfs_poll_helper ph;
220 ph.st = st;
221 init_poll_funcptr(&ph.pt, &pohmelfs_queue_func);
223 st->sock->ops->poll(NULL, st->sock, &ph.pt);
224 return 0;
227 static void pohmelfs_state_send_work(struct work_struct *work)
229 struct pohmelfs_state *st = container_of(work, struct pohmelfs_state, send_work);
230 struct pohmelfs_trans *t;
231 int trans_put;
232 int err;
234 while (1) {
235 t = NULL;
236 trans_put = 0;
238 mutex_lock(&st->trans_lock);
239 if (!list_empty(&st->trans_list)) {
240 t = list_first_entry(&st->trans_list, struct pohmelfs_trans, trans_entry);
241 list_del_init(&t->trans_entry);
242 err = pohmelfs_trans_insert_tree(st, t);
243 if (err)
244 trans_put = 1;
246 mutex_unlock(&st->trans_lock);
248 if (!t)
249 break;
251 if (t->wctl)
252 err = pohmelfs_page_send(t);
253 else
254 err = pohmelfs_data_send(t);
256 if (trans_put)
257 pohmelfs_trans_put(t);
259 if (err) {
260 pohmelfs_print_addr(&st->sa, "send error: %d\n", err);
262 pohmelfs_state_add_reconnect(st);
263 break;
268 static void pohmelfs_suck_scratch(struct pohmelfs_state *st)
270 struct dnet_cmd *cmd = &st->cmd;
271 int err = 0;
273 pr_debug("pohmelfs_suck_scratch: %llu\n", (unsigned long long)cmd->size);
275 while (cmd->size) {
276 int sz = pohmelfs_scratch_buf_size;
278 if (cmd->size < sz)
279 sz = cmd->size;
281 err = pohmelfs_data_recv(st, pohmelfs_scratch_buf, sz, MSG_WAITALL);
282 if (err < 0) {
283 pohmelfs_print_addr(&st->sa, "recv-scratch err: %d\n", err);
284 goto err_out_exit;
287 cmd->size -= err;
290 err_out_exit:
291 st->cmd_read = 1;
294 static void pohmelfs_state_recv_work(struct work_struct *work)
296 struct pohmelfs_state *st = container_of(work, struct pohmelfs_state, recv_work);
297 struct dnet_cmd *cmd = &st->cmd;
298 struct pohmelfs_trans *t;
299 unsigned long long trans;
300 unsigned int revents;
301 int err = 0;
303 while (1) {
304 revents = st->sock->ops->poll(NULL, st->sock, NULL);
305 if (!(revents & POLLIN))
306 break;
308 if (st->cmd_read) {
309 err = pohmelfs_data_recv(st, cmd, sizeof(struct dnet_cmd), MSG_WAITALL);
310 if (err < 0) {
311 pohmelfs_print_addr(&st->sa, "recv error: %d\n", err);
312 goto err_out_exit;
315 dnet_convert_cmd(cmd);
317 trans = cmd->trans & ~DNET_TRANS_REPLY;
318 st->cmd_read = 0;
321 t = pohmelfs_trans_lookup(st, cmd);
322 if (!t) {
323 pohmelfs_suck_scratch(st);
325 err = 0;
326 goto err_out_continue;
328 if (cmd->size && (t->recv_offset != cmd->size)) {
329 err = t->cb.recv_reply(t, st);
330 if (err && (err != -EAGAIN)) {
331 pohmelfs_print_addr(&st->sa, "recv-reply error: %d\n", err);
332 goto err_out_remove;
335 if (t->recv_offset != cmd->size)
336 goto err_out_continue_put;
339 err = t->cb.complete(t, st);
340 if (err) {
341 pohmelfs_print_addr(&st->sa, "recv-complete err: %d\n", err);
344 kfree(t->recv_data);
345 t->recv_data = NULL;
346 t->recv_offset = 0;
348 err_out_remove:
349 /* only remove and free transaction if there is error or there will be no more replies */
350 if (!(cmd->flags & DNET_FLAGS_MORE) || err) {
351 pohmelfs_trans_remove(t);
354 * refcnt was grabbed twice:
355 * in pohmelfs_trans_lookup()
356 * and at transaction creation
358 pohmelfs_trans_put(t);
360 st->cmd_read = 1;
361 if (err) {
362 cmd->size -= t->recv_offset;
363 t->recv_offset = 0;
365 err_out_continue_put:
366 pohmelfs_trans_put(t);
367 err_out_continue:
368 if (err && (err != -EAGAIN)) {
369 //pohmelfs_suck_scratch(st);
370 goto err_out_exit;
373 continue;
376 err_out_exit:
377 if (err && err != -EAGAIN)
378 pohmelfs_state_add_reconnect(st);
379 return;
382 struct pohmelfs_state *pohmelfs_addr_exist(struct pohmelfs_sb *psb, struct sockaddr_storage *sa, int addrlen)
384 struct pohmelfs_state *st;
386 list_for_each_entry(st, &psb->state_list, state_entry) {
387 if (st->addrlen != addrlen)
388 continue;
390 if (!memcmp(&st->sa, sa, addrlen)) {
391 return st;
395 return 0;
398 struct pohmelfs_state *pohmelfs_state_create(struct pohmelfs_sb *psb, struct sockaddr_storage *sa, int addrlen,
399 int ask_route, int group_id)
401 int err = 0;
402 struct pohmelfs_state *st;
403 struct sockaddr *addr = (struct sockaddr *)sa;
405 /* early check - this state can be inserted into route table, no need to create state and check again */
406 spin_lock(&psb->state_lock);
407 if (pohmelfs_addr_exist(psb, sa, addrlen))
408 err = -EEXIST;
409 spin_unlock(&psb->state_lock);
411 if (err)
412 goto err_out_exit;
414 st = kzalloc(sizeof(struct pohmelfs_state), GFP_KERNEL);
415 if (!st) {
416 err = -ENOMEM;
417 goto err_out_exit;
420 st->psb = psb;
421 mutex_init(&st->trans_lock);
422 INIT_LIST_HEAD(&st->trans_list);
423 st->trans_root = RB_ROOT;
425 st->group_id = group_id;
427 kref_init(&st->refcnt);
429 INIT_WORK(&st->send_work, pohmelfs_state_send_work);
430 INIT_WORK(&st->recv_work, pohmelfs_state_recv_work);
432 st->cmd_read = 1;
434 err = sock_create(addr->sa_family, SOCK_STREAM, IPPROTO_TCP, &st->sock);
435 if (err) {
436 pohmelfs_print_addr(sa, "sock_create: failed family: %d, err: %d\n", addr->sa_family, err);
437 goto err_out_free;
440 st->sock->sk->sk_allocation = GFP_NOIO;
441 st->sock->sk->sk_sndtimeo = st->sock->sk->sk_rcvtimeo = msecs_to_jiffies(60000);
443 err = 1;
444 sock_setsockopt(st->sock, SOL_SOCKET, SO_KEEPALIVE, (char *)&err, 4);
446 tcp_setsockopt(st->sock->sk, SOL_TCP, TCP_KEEPIDLE, (char *)&psb->keepalive_idle, 4);
447 tcp_setsockopt(st->sock->sk, SOL_TCP, TCP_KEEPINTVL, (char *)&psb->keepalive_interval, 4);
448 tcp_setsockopt(st->sock->sk, SOL_TCP, TCP_KEEPCNT, (char *)&psb->keepalive_cnt, 4);
450 err = kernel_connect(st->sock, (struct sockaddr *)addr, addrlen, 0);
451 if (err) {
452 pohmelfs_print_addr(sa, "kernel_connect: failed family: %d, err: %d\n", addr->sa_family, err);
453 goto err_out_release;
455 st->sock->sk->sk_sndtimeo = st->sock->sk->sk_rcvtimeo = msecs_to_jiffies(60000);
457 memcpy(&st->sa, sa, sizeof(struct sockaddr_storage));
458 st->addrlen = addrlen;
460 pohmelfs_print_addr(sa, "connected\n");
462 err = pohmelfs_poll_init(st);
463 if (err)
464 goto err_out_shutdown;
467 spin_lock(&psb->state_lock);
468 err = -EEXIST;
469 if (!pohmelfs_addr_exist(psb, sa, addrlen)) {
470 list_add_tail(&st->state_entry, &psb->state_list);
471 err = 0;
473 spin_unlock(&psb->state_lock);
475 if (err)
476 goto err_out_poll_exit;
478 if (ask_route) {
479 err = pohmelfs_route_request(st);
480 if (err)
481 goto err_out_poll_exit;
484 return st;
486 err_out_poll_exit:
487 pohmelfs_poll_exit(st);
488 err_out_shutdown:
489 st->sock->ops->shutdown(st->sock, 2);
490 err_out_release:
491 sock_release(st->sock);
492 err_out_free:
493 kfree(st);
494 err_out_exit:
495 if (err != -EEXIST) {
496 pohmelfs_print_addr(sa, "state creation failed: %d\n", err);
498 return ERR_PTR(err);
501 static void pohmelfs_state_exit(struct pohmelfs_state *st)
503 if (!st->sock)
504 return;
506 pohmelfs_poll_exit(st);
507 st->sock->ops->shutdown(st->sock, 2);
509 pohmelfs_print_addr(&st->sa, "disconnected\n");
510 sock_release(st->sock);
513 static void pohmelfs_state_release(struct kref *kref)
515 struct pohmelfs_state *st = container_of(kref, struct pohmelfs_state, refcnt);
516 pohmelfs_state_exit(st);
519 void pohmelfs_state_put(struct pohmelfs_state *st)
521 kref_put(&st->refcnt, pohmelfs_state_release);
524 static void pohmelfs_state_clean(struct pohmelfs_state *st)
526 struct pohmelfs_trans *t, *tmp;
528 pohmelfs_route_remove_all(st);
530 mutex_lock(&st->trans_lock);
531 list_for_each_entry_safe(t, tmp, &st->trans_list, trans_entry) {
532 list_del(&t->trans_entry);
533 pohmelfs_trans_put(t);
536 while (1) {
537 struct rb_node *n = rb_first(&st->trans_root);
538 if (!n)
539 break;
541 t = rb_entry(n, struct pohmelfs_trans, trans_node);
542 pohmelfs_trans_put(t);
544 mutex_unlock(&st->trans_lock);
546 cancel_work_sync(&st->send_work);
547 cancel_work_sync(&st->recv_work);
550 void pohmelfs_state_kill(struct pohmelfs_state *st)
552 BUG_ON(!list_empty(&st->state_entry));
554 pohmelfs_state_clean(st);
555 pohmelfs_state_put(st);
558 void pohmelfs_state_schedule(struct pohmelfs_state *st)
560 struct pohmelfs_sb *psb = st->psb;
562 queue_work(psb->wq, &st->send_work);
565 int pohmelfs_state_add_reconnect(struct pohmelfs_state *st)
567 struct pohmelfs_sb *psb = st->psb;
568 struct pohmelfs_reconnect *r, *tmp;
569 int err = 0;
571 pohmelfs_route_remove_all(st);
574 * Remove state from route table
576 spin_lock(&psb->state_lock);
577 list_move(&st->state_entry, &psb->kill_state_list);
578 spin_unlock(&psb->state_lock);
580 r = kzalloc(sizeof(struct pohmelfs_reconnect), GFP_NOIO);
581 if (!r) {
582 err = -ENOMEM;
583 goto err_out_exit;
586 memcpy(&r->sa, &st->sa, sizeof(struct sockaddr_storage));
587 r->addrlen = st->addrlen;
588 r->group_id = st->group_id;
590 mutex_lock(&psb->reconnect_lock);
591 list_for_each_entry(tmp, &psb->reconnect_list, reconnect_entry) {
592 if (tmp->addrlen != r->addrlen)
593 continue;
595 if (memcmp(&tmp->sa, &r->sa, r->addrlen))
596 continue;
598 err = -EEXIST;
599 break;
602 if (!err) {
603 list_add_tail(&r->reconnect_entry, &psb->reconnect_list);
605 mutex_unlock(&psb->reconnect_lock);
607 if (err)
608 goto err_out_free;
610 /* we do not really care if this work will not be processed immediately */
611 queue_delayed_work(psb->wq, &psb->reconnect_work, 0);
613 pohmelfs_print_addr(&st->sa, "reconnection added\n");
614 err = 0;
615 goto err_out_exit;
617 err_out_free:
618 kfree(r);
619 err_out_exit:
620 return err;