libcor: nonblocking read
[corutils.git] / libcor.c
blob6faa4f22bd0920e2d4bbfa8dbd6ef16928f2ea12
1 /**
2 * Connection oriented routing user space utils
3 * Copyright (C) 2009-2011
4 * Authors:
5 * Michael Blizek
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License
9 * as published by the Free Software Foundation; either version 2
10 * of the License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20 * 02110-1301, USA.
23 #include <linux/types.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <assert.h>
30 #include "libcor.h"
32 #define likely(x) __builtin_expect((x), 1)
33 #define unlikely(x) __builtin_expect((x), 0)
36 #define CD_CONNECT_NB 1
37 #define CD_CONNECT_PORT 2
38 #define CD_LIST_NEIGH 3
39 #define CD_SET_FORWARD_TIMEOUT 4
40 #define CD_SET_BACKWARD_TIMEOUT 5
42 #define CDR_EXECOK 1
44 #define CDR_EXECFAILED 2
45 #define CDR_EXECFAILED_UNKNOWN_COMMAND 1
46 #define CDR_EXECFAILED_PERMISSION_DENIED 2
47 #define CDR_EXECFAILED_TEMPORARILY_OUT_OF_RESSOURCES 3
48 #define CDR_EXECFAILED_CMD_TOO_SHORT 4
49 #define CDR_EXECFAILED_CMD_TOO_LONG 5
50 #define CDR_EXECFAILED_TARGETADDR_DOESNTEXIST 6
51 #define CDR_EXECFAILED_PORTCLOSED 7
52 #define CDR_EXECFAILED_LISTENERQUEUE_FULL 8
53 #define CDR_EXECFAILED_ILLEGAL_COMMAND 9
55 #define CDR_BINDATA 3
57 #define LIST_NEIGH_FIELD_ADDR 1
58 #define LIST_NEIGH_FIELD_LATENCY 2
62 #define CRD_KTU_CONNECT 1
64 #define CRD_UTK_UP 1
66 * CRD_KTU_CONNECT[4] length[4] flags[8] addrlen[4] addr[addrlen]
71 #define CRD_UTK_CONNECTERROR 2
73 * CRD_KTU_CONNECT[4] length[4] cookie[8] error[4]
77 static int bzero_nr_iffinished(
78 struct libcor_nonblock_resumeinfo *nr, int rc)
80 if (rc != RC_WOULDBLOCK) {
81 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
83 return rc;
86 static int send_fully(int fd, char *buf, __u32 len)
88 while (len > 0) {
89 int sent = send(fd, buf, len, 0);
91 if (sent == 0) {
92 printf("non-blocking write not supported");
93 return RC_CONNBROKEN;
96 if (sent == -1) {
97 if (errno == EINTR)
98 continue;
100 perror("send");
101 return RC_CONNBROKEN;
104 buf += sent;
105 len -= sent;
108 return RC_OK;
111 static int read_fully(int fd, struct libcor_nonblock_resumeinfo *nr,
112 char *buf, __u32 len, __u32 *maxread)
114 int rc = RC_OK;
116 if (unlikely(nr->type != RESUME_TYPE_READ)) {
117 assert(0);
118 exit(1);
119 return RC_CONNBROKEN;
122 if (unlikely(nr->data.read.read_fully.state != 0 && (
123 nr->data.read.read_fully.fd != fd ||
124 nr->data.read.read_fully.buf != buf ||
125 nr->data.read.read_fully.len != len ||
126 nr->data.read.read_fully.maxread != maxread))) {
127 assert(0);
128 exit(1);
129 return RC_CONNBROKEN;
132 if (nr->data.read.read_fully.state == 1) {
133 goto state_1;
134 } else if (unlikely(nr->data.read.read_fully.state != 0)) {
135 assert(0);
136 exit(1);
137 return RC_CONNBROKEN;
140 nr->data.read.read_fully.fd = fd;
141 nr->data.read.read_fully.buf = buf;
142 nr->data.read.read_fully.len = len;
143 nr->data.read.read_fully.maxread = maxread;
144 nr->data.read.read_fully.totalread = 0;
146 nr->data.read.read_fully.state = 1;
147 state_1:
149 if (maxread != 0) {
150 if (len > (*maxread)) {
151 printf("error in read_fully: maxread reached/n");
152 rc = RC_CONNBROKEN;
153 goto out;
155 (*maxread) -= len;
158 while (len > nr->data.read.read_fully.totalread) {
159 int rcvd = recv(fd, buf + nr->data.read.read_fully.totalread,
160 len - nr->data.read.read_fully.totalread, 0);
161 int u;
163 if (rcvd == 0) {
164 printf("non-blocking read not supported");
165 rc = RC_WOULDBLOCK;
166 goto out;
169 if (rcvd < 0) {
170 if (errno == EINTR)
171 continue;
173 perror("recv");
174 rc = RC_CONNBROKEN;
175 goto out;
178 /*printf("rcvd: %d:", rcvd);
179 for(u=0;u<rcvd;u++) {
180 printf(" %d, ", (__s32) ((__u8) buf[totalread+u]));
182 printf("\n");*/
184 nr->data.read.read_fully.totalread += (__u32) rcvd;
187 out:
188 if (rc != RC_WOULDBLOCK) {
189 bzero(&(nr->data.read.read_fully),
190 sizeof(nr->data.read.read_fully));
193 return RC_OK;
196 static int read_discard(int fd, struct libcor_nonblock_resumeinfo *nr,
197 __u32 len)
199 char buf[128];
200 int rc = RC_OK;
202 if (unlikely(nr->type != RESUME_TYPE_READ)) {
203 assert(0);
204 exit(1);
205 return RC_CONNBROKEN;
208 if (unlikely(nr->data.read.read_discard.state != 0 && (
209 nr->data.read.read_discard.fd != fd ||
210 nr->data.read.read_discard.len != len))) {
211 assert(0);
212 exit(1);
213 return RC_CONNBROKEN;
216 if (nr->data.read.read_discard.state == 1) {
217 goto state_1;
218 } else if (unlikely(nr->data.read.read_discard.state != 0)) {
219 assert(0);
220 exit(1);
221 return RC_CONNBROKEN;
224 nr->data.read.read_discard.fd = fd;
225 nr->data.read.read_discard.len = len;
226 nr->data.read.read_discard.discarded = 0;
229 nr->data.read.read_discard.state = 1;
230 state_1:
232 while (len > 0) {
233 int rcvd;
235 __u32 rcvlen = len - nr->data.read.read_discard.discarded;
236 if (rcvlen > 128)
237 rcvlen = 128;
239 rcvd = recv(fd, buf, rcvlen, 0);
241 if (rcvd == 0) {
242 rc = RC_WOULDBLOCK;
243 break;
246 if (rcvd < 0) {
247 if (errno == EINTR)
248 continue;
250 perror("recv");
251 rc = RC_CONNBROKEN;
252 goto out;
255 nr->data.read.read_discard.discarded -= rcvd;
258 out:
259 if (rc != RC_WOULDBLOCK) {
260 bzero(&(nr->data.read.read_discard),
261 sizeof(nr->data.read.read_discard));
264 return rc;
267 static int encode_len(char *buf, int buflen, __u32 len)
269 if (unlikely(buf == 0))
270 return -1;
272 if (unlikely(buflen < 4))
273 return -1;
275 if (len <= 127) {
276 buf[0] = (__u8) len;
277 return 1;
280 if (len <= 16511) {
281 buf[0] = (__u8) ((len - 128) /256 + 128);
282 buf[1] = (__u8) ((len- 128) & 255);
283 return 2;
286 if (len < 1073741951) {
287 __u32 len_be = htonl(len - 16511);
288 char *len_p = (char *) &len_be;
290 buf[0] = len_p[0] + 192;
291 buf[1] = len_p[1];
292 buf[2] = len_p[2];
293 buf[3] = len_p[3];
294 return 4;
297 return -1;
300 static int decode_len(char *buf, int buflen, __u32 *len)
302 if (unlikely(buflen < 1))
303 return 0;
305 if ((__u8) buf[0] <= 127) {
306 *len = (__u8) buf[0];
307 return 1;
310 if (buflen < 2)
311 return 0;
313 if ((__u8) buf[0] <= 191) {
314 *len = 128 + ((__u32) ((__u8) buf[0]) - 128) * 256 +
315 ((__u8) buf[1]);
316 return 2;
319 if (buflen < 4)
320 return 0;
322 ((char *) len)[0] = buf[0] - 192;
323 ((char *) len)[1] = buf[1];
324 ((char *) len)[2] = buf[2];
325 ((char *) len)[3] = buf[3];
327 *len = ntohl(*len) + 16511;
329 return 4;
332 static int read_len(int fd, struct libcor_nonblock_resumeinfo *nr,
333 __u32 *len, __u32 *maxread)
335 int rc = RC_CONNBROKEN;
337 if (unlikely(nr->type != RESUME_TYPE_READ)) {
338 assert(0);
339 exit(1);
340 return RC_CONNBROKEN;
343 if (unlikely(nr->data.read.read_len.state != 0 && (
344 nr->data.read.read_len.fd != fd ||
345 nr->data.read.read_len.len != len ||
346 nr->data.read.read_len.maxread != maxread))) {
347 assert(0);
348 exit(1);
349 return RC_CONNBROKEN;
352 if (sizeof(nr->data.read.read_len.buf) != 4) {
353 assert(0);
354 exit(1);
355 return RC_CONNBROKEN;
358 if (nr->data.read.read_len.state == 1) {
359 goto state_1;
360 } else if (unlikely(nr->data.read.read_len.state != 0)) {
361 assert(0);
362 exit(1);
363 return RC_CONNBROKEN;
366 nr->data.read.read_len.fd = fd;
367 nr->data.read.read_len.len = len;
368 nr->data.read.read_len.maxread = maxread;
369 bzero(&(nr->data.read.read_len.buf[0]), 4);
370 nr->data.read.read_len.read = 0;
373 while(1) {
374 nr->data.read.read_len.state = 1;
375 state_1:
376 if (nr->data.read.read_len.read >= 4) {
377 printf("error in readlen: read to high\n");
378 goto out_connbroken;
381 rc = read_fully(fd, nr, &(nr->data.read.read_len.buf[0]) +
382 nr->data.read.read_len.read, 1, maxread);
383 if (rc != RC_OK)
384 return rc;
386 nr->data.read.read_len.read++;
388 rc = decode_len(&(nr->data.read.read_len.buf[0]),
389 nr->data.read.read_len.read, len);
390 if (rc > 0) {
391 if (unlikely(rc < nr->data.read.read_len.read)) {
392 printf("error in readlen: decode_len has not "
393 "consumed the whole buffer\n");
394 goto out_connbroken;
396 rc = RC_OK;
397 break;
401 if (0) {
402 out_connbroken:
403 rc = RC_CONNBROKEN;
405 out:
407 if (rc != RC_WOULDBLOCK) {
408 bzero(&(nr->data.read.read_len),
409 sizeof(nr->data.read.read_len));
412 return rc;
415 static int send_cmd_hdr(int fd, __u16 cmd, __u32 len)
417 char cmdlenbuf[6];
418 int rc;
420 #warning todo commands are sent via multiple packets
422 cmd = htons(cmd);
424 cmdlenbuf[0] = ((char *) &cmd)[0];
425 cmdlenbuf[1] = ((char *) &cmd)[1];
427 rc = encode_len(cmdlenbuf+2, 4, len);
429 if (rc <= 0)
430 return RC_CONNBROKEN;
432 rc = send_fully(fd, cmdlenbuf, 2 + rc);
433 if (rc != RC_OK)
434 return RC_CONNBROKEN;
436 return RC_OK;
439 static int send_cmd(int fd, __u16 cmd, char *buf, __u32 len)
441 int rc = send_cmd_hdr(fd, cmd, len);
443 if (rc != RC_OK)
444 return RC_CONNBROKEN;
446 rc = send_fully(fd, buf, len);
447 if (rc != RC_OK)
448 return RC_CONNBROKEN;
450 return RC_OK;
453 int read_resp_nonblock(int fd, struct libcor_nonblock_resumeinfo *nr)
455 int rc;
457 struct libcor_nonblock_resumeinfo_resp *nr_resp =
458 &(nr->data.read.funcdata.resp);
460 if (nr->type == RESUME_TYPE_NONE) {
461 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
462 nr->type = RESUME_TYPE_READ;
463 nr->data.read.functype = RESUME_READ_FUNC_RESP;
464 nr_resp->fd = fd;
465 } else if (unlikely(nr->type != RESUME_TYPE_READ ||
466 nr->data.read.functype != RESUME_READ_FUNC_RESP)) {
467 assert(0);
468 exit(1);
469 return RC_CONNBROKEN;
470 } else if (unlikely(nr_resp->fd != fd)) {
471 assert(0);
472 exit(1);
473 return RC_CONNBROKEN;
474 } else {
475 __u8 state = nr_resp->state;
477 if (state == 1) {
478 goto state_1;
479 } else if (state == 2) {
480 goto state_2;
481 } else if (state == 3) {
482 goto state_3;
483 } else if (unlikely(state != 0)) {
484 assert(0);
485 exit(1);
486 return RC_CONNBROKEN;
490 nr_resp->respcode = 0;
491 rc = read_fully(fd, nr, (char *) &(nr_resp->respcode), 1, 0);
493 if (rc != RC_OK)
494 return bzero_nr_iffinished(nr, rc);
496 if (nr_resp->respcode == CDR_EXECFAILED) {
497 nr_resp->reasoncode = 0;
499 nr_resp->state = 1;
500 state_1:
501 rc = read_fully(fd, nr, (char *) &(nr_resp->reasoncode), 2, 0);
502 if (rc != RC_OK)
503 return bzero_nr_iffinished(nr, rc);
505 nr_resp->reasoncode = ntohs(nr_resp->reasoncode);
507 printf("execfailed: reasoncode = %d\n", (__s32)
508 nr_resp->reasoncode);
510 nr_resp->state = 2;
511 state_2:
512 rc = read_len(fd, nr, &(nr_resp->reasonlen),
514 if (rc != RC_OK)
515 return bzero_nr_iffinished(nr, rc);
517 nr_resp->state = 3;
518 state_3:
519 rc = read_discard(fd, nr, nr_resp->reasonlen);
520 if (rc != RC_OK)
521 return bzero_nr_iffinished(nr, rc);
524 while (reasonlen > 0) {
525 __u32 nextread = reasonlen;
527 if (reasonlen > 4093)
528 nextread = 4093;
529 reasonlen -= nextread;
530 int rc = read_fully(fd, buf, nextread, 0);
532 if (rc)
533 return rc;
535 buf[nextread] = 0;
536 printf("%s", buf);
538 printf("\n");*/
541 if (nr_resp->respcode == CDR_EXECOK)
542 return bzero_nr_iffinished(nr, RC_OK);
544 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
547 int read_resp(int fd)
549 int rc = RC_WOULDBLOCK;
550 struct libcor_nonblock_resumeinfo nr;
551 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
553 rc = read_resp_nonblock(fd, &nr);
554 if (unlikely(rc == RC_WOULDBLOCK)) {
555 assert(0);
556 rc = RC_CONNBROKEN;
558 return rc;
562 int send_connect_neigh(int fd, __u32 addrlen, char *addr)
564 char buf[8];
566 __u32 len = 0;
567 int rc;
569 rc = encode_len(buf, 4, addrlen);
570 if (unlikely(rc <= 0))
571 return RC_CONNBROKEN;
573 len += rc;
575 /* printf("send_connect_neigh %d %d %c%c\n", (int) addrtypelen,
576 * (int) addrlen, (char) addrtype[0],
577 * (char) addrtype[1]); */
579 rc = send_cmd_hdr(fd, CD_CONNECT_NB, len + addrlen);
580 if (rc != RC_OK)
581 return RC_CONNBROKEN;
583 rc = send_fully(fd, buf, len);
584 if (rc != RC_OK)
585 return RC_CONNBROKEN;
587 rc = send_fully(fd, addr, addrlen);
588 if (rc != RC_OK)
589 return RC_CONNBROKEN;
591 return rc;
594 int send_connect_port(int fd, __u64 port)
596 __u64 p = htonll(port);
597 return send_cmd(fd, CD_CONNECT_PORT, (char *) &p, 8);
600 int send_list_neigh(int fd, __u32 limit, __u32 offset)
602 char buf[8];
603 __u32 len = 0;
604 int rc;
606 rc = encode_len(buf, 4, limit);
607 if (unlikely(rc <= 0))
608 return RC_CONNBROKEN;
610 len += rc;
612 rc = encode_len(buf + len, 4, offset);
613 if (unlikely(rc <= 0))
614 return RC_CONNBROKEN;
616 len += rc;
618 return send_cmd(fd, CD_LIST_NEIGH, buf, len);
621 static int field_present(struct listneigh_field *fields, __u32 numfields,
622 __u16 field)
624 __u64 u;
625 for(u=0;u<numfields;u++) {
626 if (fields[u].field == field)
627 return 1;
629 return 0;
632 int read_neigh_list_nonblock(int fd, struct libcor_nonblock_resumeinfo *nr,
633 void *ptr,
634 void (*init)(void *ptr, __u32 totalneighs, __u32 responserows),
635 void (*next_neigh)(void *ptr, __u32 addrlen, char *addr))
637 int rc;
639 struct libcor_nonblock_resumeinfo_neighlist *nr_nl =
640 &(nr->data.read.funcdata.neighlist);
642 if (nr->type == RESUME_TYPE_NONE) {
643 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
644 nr->type = RESUME_TYPE_READ;
645 nr->data.read.functype = RESUME_READ_FUNC_NEIGHLIST;
646 nr_nl->fd = fd;
647 nr_nl->ptr = ptr;
648 nr_nl->init = init;
649 nr_nl->next_neigh = next_neigh;
650 } else if (unlikely(nr->type != RESUME_TYPE_READ ||
651 nr->data.read.functype != RESUME_READ_FUNC_NEIGHLIST)) {
652 assert(0);
653 exit(1);
654 return RC_CONNBROKEN;
655 } else if (unlikely(nr_nl->fd != fd ||
656 nr_nl->ptr != ptr ||
657 nr_nl->init != init ||
658 nr_nl->next_neigh != next_neigh)) {
659 assert(0);
660 exit(1);
661 return RC_CONNBROKEN;
662 } else {
663 __u8 state = nr_nl->state;
664 if (state == 1) {
665 goto state_1;
666 } else if (state == 2) {
667 goto state_2;
668 } else if (state == 3) {
669 goto state_3;
670 } else if (state == 4) {
671 goto state_4;
672 } else if (state == 5) {
673 goto state_5;
674 } else if (state == 6) {
675 goto state_6;
676 } else if (state == 7) {
677 goto state_7;
678 } else if (state == 8) {
679 goto state_8;
680 } else if (state == 9) {
681 goto state_9;
682 } else if (state ==10) {
683 goto state_10;
684 } else if (state == 11) {
685 goto state_11;
686 } else if (unlikely(state != 0)) {
687 assert(0);
688 exit(1);
689 return RC_CONNBROKEN;
693 nr_nl->respcode = 0;
695 rc = read_fully(fd, nr, (char *) &(nr_nl->respcode), 1, 0);
696 if (rc != RC_OK)
697 return bzero_nr_iffinished(nr, rc);
699 if (unlikely(nr_nl->respcode != CDR_BINDATA))
700 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
702 nr_nl->state = 1;
703 state_1:
704 rc = read_len(fd, nr, &(nr_nl->len), 0);
705 if (rc != RC_OK)
706 return bzero_nr_iffinished(nr, rc);
708 nr_nl->state = 2;
709 state_2:
710 rc = read_len(fd, nr, &(nr_nl->totalneighs), &(nr_nl->len));
711 if (rc != RC_OK)
712 return bzero_nr_iffinished(nr, rc);
714 nr_nl->state = 3;
715 state_3:
716 rc = read_len(fd, nr, &(nr_nl->response_rows), &(nr_nl->len));
717 if (rc != RC_OK)
718 return bzero_nr_iffinished(nr, rc);
720 nr_nl->state = 4;
721 state_4:
722 rc = read_len(fd, nr, &(nr_nl->numfields), &(nr_nl->len));
723 if (rc != RC_OK)
724 return bzero_nr_iffinished(nr, rc);
726 if (unlikely(nr_nl->numfields > NEIGHLIST_MAX_FIELDS))
727 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
729 for(nr_nl->u=0;nr_nl->u<nr_nl->numfields;nr_nl->u++) {
730 nr_nl->fields[nr_nl->u].field = 0;
732 nr_nl->state = 5;
733 state_5:
734 rc = read_fully(fd, nr,
735 (char *) &(nr_nl->fields[nr_nl->u].field),
736 2, &(nr_nl->len));
737 if (rc != RC_OK)
738 return bzero_nr_iffinished(nr, rc);
739 nr_nl->fields[nr_nl->u].field =
740 ntohs(nr_nl->fields[nr_nl->u].field);
742 nr_nl->state = 6;
743 state_6:
744 nr_nl->fieldlen = 0;
745 rc = read_len(fd, nr, &(nr_nl->fieldlen), &(nr_nl->len));
746 if (rc != RC_OK)
747 return bzero_nr_iffinished(nr, rc);
749 if (unlikely(nr_nl->fieldlen > 65535))
750 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
752 nr_nl->fields[nr_nl->u].len = (__u16) nr_nl->fieldlen;
753 nr_nl->fieldlen = 0;
756 if (unlikely(field_present(nr_nl->fields, nr_nl->numfields,
757 LIST_NEIGH_FIELD_ADDR) == 0))
758 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
760 init(ptr, nr_nl->totalneighs, nr_nl->response_rows);
762 for (nr_nl->u=0;nr_nl->u<nr_nl->response_rows;nr_nl->u++) {
763 bzero(&(nr_nl->addr[0]), ADDR_MAX_LEN);
764 nr_nl->addrlen = 0;
765 nr_nl->latency = 0;
767 for(nr_nl->v=0;nr_nl->v<nr_nl->numfields;nr_nl->v++) {
768 nr_nl->fieldlen = nr_nl->fields[nr_nl->v].len;
769 if (nr_nl->fieldlen == 0) {
770 nr_nl->state = 7;
771 state_7:
772 rc = read_len(fd, nr, &(nr_nl->fieldlen),
773 &(nr_nl->len));
774 if (rc != RC_OK)
775 return bzero_nr_iffinished(nr, rc);
778 if (nr_nl->fieldlen > nr_nl->len)
779 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
781 nr_nl->len -= nr_nl->fieldlen;
783 if (field_present(nr_nl->fields, nr_nl->v,
784 nr_nl->fields[nr_nl->v].field)) {
785 goto discard_field;
786 } else if (nr_nl->fields[nr_nl->v].field ==
787 LIST_NEIGH_FIELD_ADDR) {
788 nr_nl->state = 8;
789 state_8:
790 rc = read_len(fd, nr, &(nr_nl->addrlen),
791 &(nr_nl->fieldlen));
792 if (rc != RC_OK)
793 return bzero_nr_iffinished(nr, rc);
795 if (unlikely(nr_nl->addrlen > ADDR_MAX_LEN))
796 goto discard_field;
798 if (sizeof(nr_nl->addr) != ADDR_MAX_LEN) {
799 assert(0);
800 exit(1);
801 return RC_CONNBROKEN;
803 bzero(&(nr_nl->addr[0]), ADDR_MAX_LEN);
805 nr_nl->state = 9;
806 state_9:
807 rc = read_fully(fd, nr, &(nr_nl->addr[0]),
808 nr_nl->addrlen,
809 &(nr_nl->fieldlen));
810 if (rc != RC_OK)
811 return bzero_nr_iffinished(nr, rc);
812 } else if (nr_nl->fields[nr_nl->v].field ==
813 LIST_NEIGH_FIELD_LATENCY) {
814 nr_nl->latency = 0;
816 nr_nl->state = 10;
817 state_10:
818 rc = read_fully(fd, nr, &nr_nl->latency, 1,
819 &(nr_nl->fieldlen));
820 if (rc != RC_OK)
821 return bzero_nr_iffinished(nr, rc);
823 printf("latency %d\n", (int) nr_nl->latency);
826 discard_field:
827 if (nr_nl->fieldlen > 0) {
828 nr_nl->state = 11;
829 state_11:
830 rc = read_discard(fd, nr, nr_nl->fieldlen);
831 if (rc != RC_OK)
832 return bzero_nr_iffinished(nr, rc);
835 nr_nl->fieldlen = 0;
838 if (nr_nl->addrlen > ADDR_MAX_LEN) {
839 assert(0);
840 exit(1);
841 return RC_CONNBROKEN;
843 next_neigh(ptr, nr_nl->addrlen, &(nr_nl->addr[0]));
844 bzero(&(nr_nl->addr[0]), ADDR_MAX_LEN);
845 nr_nl->addrlen = 0;
846 nr_nl->latency = 0;
849 return rc;
852 int read_neigh_list(int fd, void *ptr,
853 void (*init)(void *ptr, __u32 totalneighs, __u32 responserows),
854 void (*next_neigh)(void *ptr, __u32 addrlen, char *addr))
856 int rc = RC_WOULDBLOCK;
857 struct libcor_nonblock_resumeinfo nr;
858 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
860 rc = read_neigh_list_nonblock(fd, &nr, ptr, init, next_neigh);
861 if (unlikely(rc == RC_WOULDBLOCK)) {
862 assert(0);
863 rc = RC_CONNBROKEN;
865 return rc;
868 int pass_socket(int fd, __u64 cookie)
870 int rc;
872 rc = setsockopt(fd, SOL_COR, COR_PASS_ON_CLOSE, &cookie, 8);
873 if (rc != 0) {
874 perror("pass_socket");
875 return RC_CONNBROKEN;
878 close(fd);
880 return RC_OK;
883 static int send_rdsock_cmd(int fd, __u32 cmd, char *data, __u32 datalen)
885 int rc;
887 char hdr[8];
888 __u32 be_cmd;
889 __u32 be_datalen;
891 be_cmd = htonl(cmd);
892 be_datalen = htonl(datalen);
894 memcpy(&(hdr[0]), (char *) &be_cmd, 4);
895 memcpy(&(hdr[4]), (char *) &be_datalen, 4);
897 rc = send_fully(fd, hdr, 8);
898 if (rc != RC_OK)
899 return RC_CONNBROKEN;
901 rc = send_fully(fd, data, datalen);
902 if (rc != RC_OK)
903 return RC_CONNBROKEN;
906 int send_rdsock_connecterror(int fd, __u64 cookie, __u32 error)
908 char data[12];
910 error = htonl(error);
912 memcpy(&(data[0]), (char *) &cookie, 8);
913 memcpy(&(data[8]), (char *) &error, 4);
915 return send_rdsock_cmd(fd, CRD_UTK_CONNECTERROR, &(data[0]), 12);
918 int send_rdsock_up(int fd, char *addr, __u32 addrlen)
920 char data[72];
922 __u64 flags = 0;
924 __u32 addrlenbe;
926 if (addr == 0 && addrlen != 0)
927 return RC_CONNBROKEN;
929 if (addrlen > 64)
930 return RC_CONNBROKEN;
932 flags = htonll(flags);
933 addrlenbe = htonl(addrlen);
935 memcpy(&(data[0]), (char *) &flags, 8);
936 memcpy(&(data[8]), (char *) &addrlenbe, 4);
937 if (addrlen > 0)
938 memcpy(&(data[12]), addr, addrlen);
940 return send_rdsock_cmd(fd, CRD_UTK_UP, &(data[0]), 12 + addrlen);
943 static int proc_rdsock_connect(int fd, void *ptr, __u32 cmd, char *cmddata,
944 __u32 cmdlen, struct rdsock_opts *opts)
946 __u64 cookie;
947 struct cor_sockaddr addr;
949 memcpy((char *) &cookie, cmddata, 8);
951 if ((sizeof(struct cor_sockaddr) + 8) > cmdlen)
952 return RC_CONNBROKEN;
954 memcpy((char *) &addr, cmddata + 8, sizeof(struct cor_sockaddr));
956 if (opts->connect == 0) {
957 return send_rdsock_connecterror(fd, cookie,
958 CONNECTERROR_NETUNREACH);
959 } else {
960 return opts->connect(fd, ptr, cookie, &addr);
964 static int proc_rdsock_cmd(int fd, void *ptr, __u32 cmd, char *cmddata,
965 __u32 cmdlen, struct rdsock_opts *opts)
967 if (cmd == CRD_KTU_CONNECT) {
968 return proc_rdsock_connect(fd, ptr, cmd, cmddata, cmdlen, opts);
969 } else {
970 printf("error in proc_rdsock_cmd: unknown cmd: %u\n", cmd);
971 return RC_CONNBROKEN;
975 #warning todo
976 static int read_rdsock_cmd_nonblock(int fd,
977 struct libcor_nonblock_resumeinfo *nr,
978 void *ptr, struct rdsock_opts *opts)
980 int rc;
982 struct libcor_nonblock_resumeinfo_rdsockcmd *nr_rd =
983 &(nr->data.read.funcdata.rdsock_cmd);
985 if (nr->type == RESUME_TYPE_NONE) {
986 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
987 nr->type = RESUME_TYPE_READ;
988 nr->data.read.functype = RESUME_READ_FUNC_RDSOCK_CMD;
989 nr_rd->fd = fd;
990 nr_rd->ptr = ptr;
991 nr_rd->opts = opts;
992 } else if (unlikely(nr->type != RESUME_TYPE_READ ||
993 nr->data.read.functype != RESUME_READ_FUNC_RDSOCK_CMD)){
994 assert(0);
995 exit(1);
996 return RC_CONNBROKEN;
997 } else if (unlikely(nr_rd->fd != fd || nr_rd->ptr != ptr ||
998 nr_rd->opts != opts)) {
999 assert(0);
1000 exit(1);
1001 return RC_CONNBROKEN;
1002 } else {
1003 __u8 state = nr_rd->state;
1005 if (state == 1) {
1006 goto state_1;
1007 } else if (state == 2) {
1008 goto state_2;
1009 } else if (unlikely(state != 0)) {
1010 assert(0);
1011 exit(1);
1012 return RC_CONNBROKEN;
1016 if (sizeof(nr_rd->buf) != 8) {
1017 assert(0);
1018 exit(1);
1019 return RC_CONNBROKEN;
1022 rc = read_fully(fd, nr, (char *) &(nr_rd->buf[0]), 8, 0);
1023 if (rc != RC_OK)
1024 return RC_CONNBROKEN;
1026 ((char *) &(nr_rd->cmd))[0] = nr_rd->buf[0];
1027 ((char *) &(nr_rd->cmd))[1] = nr_rd->buf[1];
1028 ((char *) &(nr_rd->cmd))[2] = nr_rd->buf[2];
1029 ((char *) &(nr_rd->cmd))[3] = nr_rd->buf[3];
1030 ((char *) &(nr_rd->cmddatalen))[0] = nr_rd->buf[4];
1031 ((char *) &(nr_rd->cmddatalen))[1] = nr_rd->buf[5];
1032 ((char *) &(nr_rd->cmddatalen))[2] = nr_rd->buf[6];
1033 ((char *) &(nr_rd->cmddatalen))[3] = nr_rd->buf[7];
1035 nr_rd->cmd = ntohl(nr_rd->cmd);
1036 nr_rd->cmddatalen = ntohl(nr_rd->cmddatalen);
1038 if (nr_rd->cmddatalen > 65536)
1039 goto discard;
1041 nr_rd->cmddata = malloc(nr_rd->cmddatalen);
1042 if (nr_rd->cmddata == 0)
1043 goto discard;
1045 nr_rd->state = 1;
1046 state_1:
1047 rc = read_fully(fd, nr, nr_rd->cmddata, nr_rd->cmddatalen, 0);
1048 if (rc != RC_OK) {
1049 rc = RC_CONNBROKEN;
1050 goto out;
1053 proc_rdsock_cmd(fd, ptr, nr_rd->cmd, nr_rd->cmddata, nr_rd->cmddatalen,
1054 opts);
1056 out:
1057 free(nr_rd->cmddata);
1059 if (0) {
1060 discard:
1061 nr_rd->state = 2;
1062 state_2:
1063 rc = read_discard(fd, nr, nr_rd->cmddatalen);
1064 if (rc != RC_OK)
1065 rc = RC_CONNBROKEN;
1067 return rc;
1070 int read_rdsock_cmd(int fd, void *ptr, struct rdsock_opts *opts)
1072 int rc = RC_WOULDBLOCK;
1073 struct libcor_nonblock_resumeinfo nr;
1074 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
1076 rc = read_rdsock_cmd_nonblock(fd, &nr, ptr, opts);
1077 if (unlikely(rc == RC_WOULDBLOCK)) {
1078 assert(0);
1079 rc = RC_CONNBROKEN;
1081 return rc;