nonblocking writes
[corutils.git] / libcor.c
blob64392983b5e4d880342144574e000d0b8125d7c3
1 /**
2 * Connection oriented routing user space utils
3 * Copyright (C) 2009-2011
4 * Authors:
5 * Michael Blizek
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License
9 * as published by the Free Software Foundation; either version 2
10 * of the License, or (at your option) any later version.
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
20 * 02110-1301, USA.
23 #include <linux/types.h>
24 #include <errno.h>
25 #include <stdio.h>
26 #include <string.h>
27 #include <assert.h>
29 #include "libcor.h"
31 #define likely(x) __builtin_expect((x), 1)
32 #define unlikely(x) __builtin_expect((x), 0)
35 #define CD_CONNECT_NB 1
36 #define CD_CONNECT_PORT 2
37 #define CD_LIST_NEIGH 3
38 #define CD_SET_FORWARD_TIMEOUT 4
39 #define CD_SET_BACKWARD_TIMEOUT 5
41 #define CDR_EXECOK 1
43 #define CDR_EXECFAILED 2
44 #define CDR_EXECFAILED_UNKNOWN_COMMAND 1
45 #define CDR_EXECFAILED_PERMISSION_DENIED 2
46 #define CDR_EXECFAILED_TEMPORARILY_OUT_OF_RESSOURCES 3
47 #define CDR_EXECFAILED_CMD_TOO_SHORT 4
48 #define CDR_EXECFAILED_CMD_TOO_LONG 5
49 #define CDR_EXECFAILED_TARGETADDR_DOESNTEXIST 6
50 #define CDR_EXECFAILED_PORTCLOSED 7
51 #define CDR_EXECFAILED_LISTENERQUEUE_FULL 8
52 #define CDR_EXECFAILED_ILLEGAL_COMMAND 9
54 #define CDR_BINDATA 3
56 #define LIST_NEIGH_FIELD_ADDR 1
57 #define LIST_NEIGH_FIELD_LATENCY 2
60 static int bzero_nr_iffinished(struct libcor_nonblock_resumeinfo *nr, int rc)
62 if (rc != RC_WOULDBLOCK) {
63 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
65 return rc;
68 #define LIBCOR_ASSERT_ERR() \
69 do { \
70 assert(0); \
71 exit(1); \
72 while (1) { \
73 } \
74 } while(0) \
76 #warning todo commands are sent via multiple packets
77 int resume_send(int fd, struct libcor_nonblock_resumeinfo *nr)
79 if (unlikely(nr->type != RESUME_TYPE_WRITE || nr->data.write.fd !=fd))
80 LIBCOR_ASSERT_ERR();
82 if (sizeof(nr->data.write.buf) != WRITE_BUF_SIZE ||
83 unlikely(nr->data.write.len > WRITE_BUF_SIZE))
84 LIBCOR_ASSERT_ERR();
86 while (nr->data.write.totalsent < nr->data.write.len) {
87 char *buf = &(nr->data.write.buf[0]);
88 int sent = send(fd, buf + nr->data.write.totalsent,
89 nr->data.write.len - nr->data.write.totalsent,
90 0);
92 if (sent == 0) {
93 printf("non-blocking write not supported");
94 return RC_CONNBROKEN;
97 if (sent == -1) {
98 if (errno == EINTR)
99 continue;
101 perror("send");
102 return RC_CONNBROKEN;
105 nr->data.write.totalsent += sent;
108 return RC_OK;
111 static int read_fully(int fd, struct libcor_nonblock_resumeinfo *nr,
112 char *buf, __u32 len, __u32 *maxread)
114 int rc = RC_OK;
116 if (unlikely(nr->type != RESUME_TYPE_READ))
117 LIBCOR_ASSERT_ERR();
119 if (unlikely(nr->data.read.read_fully.state != 0 && (
120 nr->data.read.read_fully.fd != fd ||
121 nr->data.read.read_fully.buf != buf ||
122 nr->data.read.read_fully.len != len ||
123 nr->data.read.read_fully.maxread != maxread)))
124 LIBCOR_ASSERT_ERR();
126 if (nr->data.read.read_fully.state == 1) {
127 goto state_1;
128 } else if (unlikely(nr->data.read.read_fully.state != 0)) {
129 LIBCOR_ASSERT_ERR();
132 nr->data.read.read_fully.fd = fd;
133 nr->data.read.read_fully.buf = buf;
134 nr->data.read.read_fully.len = len;
135 nr->data.read.read_fully.maxread = maxread;
136 nr->data.read.read_fully.totalread = 0;
138 nr->data.read.read_fully.state = 1;
139 state_1:
141 if (maxread != 0) {
142 if (len > (*maxread)) {
143 printf("error in read_fully: maxread reached/n");
144 rc = RC_CONNBROKEN;
145 goto out;
147 (*maxread) -= len;
150 while (len > nr->data.read.read_fully.totalread) {
151 int rcvd = recv(fd, buf + nr->data.read.read_fully.totalread,
152 len - nr->data.read.read_fully.totalread, 0);
153 int u;
155 if (rcvd == 0) {
156 printf("non-blocking read not supported");
157 rc = RC_WOULDBLOCK;
158 goto out;
161 if (rcvd < 0) {
162 if (errno == EINTR)
163 continue;
165 perror("recv");
166 rc = RC_CONNBROKEN;
167 goto out;
170 /*printf("rcvd: %d:", rcvd);
171 for(u=0;u<rcvd;u++) {
172 printf(" %d, ", (__s32) ((__u8) buf[totalread+u]));
174 printf("\n");*/
176 nr->data.read.read_fully.totalread += (__u32) rcvd;
179 out:
180 if (rc != RC_WOULDBLOCK) {
181 bzero(&(nr->data.read.read_fully),
182 sizeof(nr->data.read.read_fully));
185 return RC_OK;
188 static int read_discard(int fd, struct libcor_nonblock_resumeinfo *nr,
189 __u32 len)
191 char buf[128];
192 int rc = RC_OK;
194 if (unlikely(nr->type != RESUME_TYPE_READ))
195 LIBCOR_ASSERT_ERR();
197 if (unlikely(nr->data.read.read_discard.state != 0 && (
198 nr->data.read.read_discard.fd != fd ||
199 nr->data.read.read_discard.len != len)))
200 LIBCOR_ASSERT_ERR();
202 if (nr->data.read.read_discard.state == 1) {
203 goto state_1;
204 } else if (unlikely(nr->data.read.read_discard.state != 0)) {
205 LIBCOR_ASSERT_ERR();
208 nr->data.read.read_discard.fd = fd;
209 nr->data.read.read_discard.len = len;
210 nr->data.read.read_discard.discarded = 0;
213 nr->data.read.read_discard.state = 1;
214 state_1:
216 while (len > 0) {
217 int rcvd;
219 __u32 rcvlen = len - nr->data.read.read_discard.discarded;
220 if (rcvlen > 128)
221 rcvlen = 128;
223 rcvd = recv(fd, buf, rcvlen, 0);
225 if (rcvd == 0) {
226 rc = RC_WOULDBLOCK;
227 break;
230 if (rcvd < 0) {
231 if (errno == EINTR)
232 continue;
234 perror("recv");
235 rc = RC_CONNBROKEN;
236 goto out;
239 nr->data.read.read_discard.discarded -= rcvd;
242 out:
243 if (rc != RC_WOULDBLOCK) {
244 bzero(&(nr->data.read.read_discard),
245 sizeof(nr->data.read.read_discard));
248 return rc;
251 static int encode_len(char *buf, int buflen, __u32 len)
253 if (unlikely(buf == 0))
254 return -1;
256 if (unlikely(buflen < 4))
257 return -1;
259 if (len <= 127) {
260 buf[0] = (__u8) len;
261 return 1;
264 if (len <= 16511) {
265 buf[0] = (__u8) ((len - 128) /256 + 128);
266 buf[1] = (__u8) ((len- 128) & 255);
267 return 2;
270 if (len < 1073741951) {
271 __u32 len_be = htonl(len - 16511);
272 char *len_p = (char *) &len_be;
274 buf[0] = len_p[0] + 192;
275 buf[1] = len_p[1];
276 buf[2] = len_p[2];
277 buf[3] = len_p[3];
278 return 4;
281 return -1;
284 static int decode_len(char *buf, int buflen, __u32 *len)
286 if (unlikely(buflen < 1))
287 return 0;
289 if ((__u8) buf[0] <= 127) {
290 *len = (__u8) buf[0];
291 return 1;
294 if (buflen < 2)
295 return 0;
297 if ((__u8) buf[0] <= 191) {
298 *len = 128 + ((__u32) ((__u8) buf[0]) - 128) * 256 +
299 ((__u8) buf[1]);
300 return 2;
303 if (buflen < 4)
304 return 0;
306 ((char *) len)[0] = buf[0] - 192;
307 ((char *) len)[1] = buf[1];
308 ((char *) len)[2] = buf[2];
309 ((char *) len)[3] = buf[3];
311 *len = ntohl(*len) + 16511;
313 return 4;
316 static int read_len(int fd, struct libcor_nonblock_resumeinfo *nr,
317 __u32 *len, __u32 *maxread)
319 int rc = RC_CONNBROKEN;
321 if (unlikely(nr->type != RESUME_TYPE_READ))
322 LIBCOR_ASSERT_ERR();
324 if (unlikely(nr->data.read.read_len.state != 0 && (
325 nr->data.read.read_len.fd != fd ||
326 nr->data.read.read_len.len != len ||
327 nr->data.read.read_len.maxread != maxread)))
328 LIBCOR_ASSERT_ERR();
330 if (sizeof(nr->data.read.read_len.buf) != 4)
331 LIBCOR_ASSERT_ERR();
333 if (nr->data.read.read_len.state == 1) {
334 goto state_1;
335 } else if (unlikely(nr->data.read.read_len.state != 0)) {
336 LIBCOR_ASSERT_ERR();
339 nr->data.read.read_len.fd = fd;
340 nr->data.read.read_len.len = len;
341 nr->data.read.read_len.maxread = maxread;
342 bzero(&(nr->data.read.read_len.buf[0]), 4);
343 nr->data.read.read_len.read = 0;
346 while(1) {
347 nr->data.read.read_len.state = 1;
348 state_1:
349 if (nr->data.read.read_len.read >= 4) {
350 printf("error in readlen: read to high\n");
351 goto out_connbroken;
354 rc = read_fully(fd, nr, &(nr->data.read.read_len.buf[0]) +
355 nr->data.read.read_len.read, 1, maxread);
356 if (rc != RC_OK)
357 return rc;
359 nr->data.read.read_len.read++;
361 rc = decode_len(&(nr->data.read.read_len.buf[0]),
362 nr->data.read.read_len.read, len);
363 if (rc > 0) {
364 if (unlikely(rc < nr->data.read.read_len.read)) {
365 printf("error in readlen: decode_len has not "
366 "consumed the whole buffer\n");
367 goto out_connbroken;
369 rc = RC_OK;
370 break;
374 if (0) {
375 out_connbroken:
376 rc = RC_CONNBROKEN;
378 out:
380 if (rc != RC_WOULDBLOCK) {
381 bzero(&(nr->data.read.read_len),
382 sizeof(nr->data.read.read_len));
385 return rc;
388 static int _send_cmd(int fd, struct libcor_nonblock_resumeinfo *nr, __u16 cmd)
390 char buf[6];
391 int rc;
392 __u32 hdrlen = 0;
395 if (unlikely(nr->type != RESUME_TYPE_WRITE || nr->data.write.fd != fd))
396 LIBCOR_ASSERT_ERR();
398 cmd = htons(cmd);
399 buf[0] = ((char *) &cmd)[0];
400 buf[1] = ((char *) &cmd)[1];
402 rc = encode_len(&(buf[2]), 4, nr->data.write.len);
403 if (rc <= 0 || rc > 4)
404 LIBCOR_ASSERT_ERR();
406 if (sizeof(nr->data.write.buf) != WRITE_BUF_SIZE)
407 LIBCOR_ASSERT_ERR();
409 hdrlen = 2 + ((__u32) rc);
411 if (unlikely(nr->data.write.len + hdrlen < nr->data.write.len ||
412 nr->data.write.len + hdrlen > WRITE_BUF_SIZE))
413 LIBCOR_ASSERT_ERR();
415 memmove(&(nr->data.write.buf[hdrlen]), &(nr->data.write.buf[0]),
416 nr->data.write.len);
417 memcpy(&(nr->data.write.buf[0]), &(buf[0]), hdrlen);
418 nr->data.write.len += hdrlen;
420 return resume_send(fd, nr);
423 static int send_cmd(int fd, struct libcor_nonblock_resumeinfo *nr,
424 __u16 cmd, char *buf, __u32 len)
426 if (unlikely(nr->type != RESUME_TYPE_NONE))
427 LIBCOR_ASSERT_ERR();
429 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
430 nr->type = RESUME_TYPE_WRITE;
431 nr->data.write.fd = fd;
433 if (sizeof(nr->data.write.buf) != WRITE_BUF_SIZE)
434 LIBCOR_ASSERT_ERR();
436 if (unlikely(WRITE_BUF_SIZE < len))
437 LIBCOR_ASSERT_ERR();
439 memcpy(&(nr->data.write.buf[0]), buf, len);
440 nr->data.write.len = len;
442 return _send_cmd(fd, nr, cmd);
445 int read_resp_nonblock(int fd, struct libcor_nonblock_resumeinfo *nr)
447 int rc;
449 struct libcor_nonblock_resumeinfo_resp *nr_resp =
450 &(nr->data.read.funcdata.resp);
452 if (nr->type == RESUME_TYPE_NONE) {
453 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
454 nr->type = RESUME_TYPE_READ;
455 nr->data.read.functype = RESUME_READ_FUNC_RESP;
456 nr_resp->fd = fd;
457 } else if (unlikely(nr->type != RESUME_TYPE_READ ||
458 nr->data.read.functype != RESUME_READ_FUNC_RESP)) {
459 LIBCOR_ASSERT_ERR();
460 } else if (unlikely(nr_resp->fd != fd)) {
461 LIBCOR_ASSERT_ERR();
462 } else {
463 __u8 state = nr_resp->state;
465 if (state == 1) {
466 goto state_1;
467 } else if (state == 2) {
468 goto state_2;
469 } else if (state == 3) {
470 goto state_3;
471 } else if (unlikely(state != 0)) {
472 LIBCOR_ASSERT_ERR();
476 nr_resp->respcode = 0;
477 rc = read_fully(fd, nr, (char *) &(nr_resp->respcode), 1, 0);
479 if (rc != RC_OK)
480 return bzero_nr_iffinished(nr, rc);
482 if (nr_resp->respcode == CDR_EXECFAILED) {
483 nr_resp->reasoncode = 0;
485 nr_resp->state = 1;
486 state_1:
487 rc = read_fully(fd, nr, (char *) &(nr_resp->reasoncode), 2, 0);
488 if (rc != RC_OK)
489 return bzero_nr_iffinished(nr, rc);
491 nr_resp->reasoncode = ntohs(nr_resp->reasoncode);
493 printf("execfailed: reasoncode = %d\n", (__s32)
494 nr_resp->reasoncode);
496 nr_resp->state = 2;
497 state_2:
498 rc = read_len(fd, nr, &(nr_resp->reasonlen),
500 if (rc != RC_OK)
501 return bzero_nr_iffinished(nr, rc);
503 nr_resp->state = 3;
504 state_3:
505 rc = read_discard(fd, nr, nr_resp->reasonlen);
506 if (rc != RC_OK)
507 return bzero_nr_iffinished(nr, rc);
510 while (reasonlen > 0) {
511 __u32 nextread = reasonlen;
513 if (reasonlen > 4093)
514 nextread = 4093;
515 reasonlen -= nextread;
516 int rc = read_fully(fd, buf, nextread, 0);
518 if (rc)
519 return rc;
521 buf[nextread] = 0;
522 printf("%s", buf);
524 printf("\n");*/
527 if (nr_resp->respcode == CDR_EXECOK)
528 return bzero_nr_iffinished(nr, RC_OK);
530 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
533 int read_resp(int fd)
535 int rc = RC_WOULDBLOCK;
536 struct libcor_nonblock_resumeinfo nr;
537 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
539 rc = read_resp_nonblock(fd, &nr);
540 if (unlikely(rc == RC_WOULDBLOCK)) {
541 LIBCOR_ASSERT_ERR();
542 return bzero_nr_iffinished(&nr, RC_CONNBROKEN);
544 return rc;
548 int send_connect_neigh_nonblock(int fd, struct libcor_nonblock_resumeinfo *nr,
549 __u32 addrlen, char *addr)
551 __u32 hdrlen = 0;
552 int rc;
554 if (unlikely(nr->type != RESUME_TYPE_NONE))
555 LIBCOR_ASSERT_ERR();
557 if (unlikely(addrlen > (1024 * 1024 * 1024) ||
558 addrlen + 4 > WRITE_BUF_SIZE))
559 LIBCOR_ASSERT_ERR();
561 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
562 nr->type = RESUME_TYPE_WRITE;
563 nr->data.write.fd = fd;
565 if (sizeof(nr->data.write.buf) != WRITE_BUF_SIZE)
566 LIBCOR_ASSERT_ERR();
568 if (unlikely(WRITE_BUF_SIZE < 4))
569 LIBCOR_ASSERT_ERR();
571 rc = encode_len(&(nr->data.write.buf[nr->data.write.len]), 4, addrlen);
572 if (unlikely(rc <= 0 || rc > 4))
573 LIBCOR_ASSERT_ERR();
575 nr->data.write.len += (__u32) rc;
577 if (unlikely(nr->data.write.len > WRITE_BUF_SIZE ||
578 WRITE_BUF_SIZE - nr->data.write.len < addrlen))
579 LIBCOR_ASSERT_ERR();
581 memcpy(&(nr->data.write.buf[nr->data.write.len]), addr, addrlen);
582 nr->data.write.len += addrlen;
584 if (unlikely(nr->data.write.len > WRITE_BUF_SIZE))
585 LIBCOR_ASSERT_ERR();
587 return _send_cmd(fd, nr, CD_CONNECT_NB);
590 int send_connect_neigh(int fd, __u32 addrlen, char *addr)
592 int rc = RC_WOULDBLOCK;
593 struct libcor_nonblock_resumeinfo nr;
594 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
596 rc = send_connect_neigh_nonblock(fd, &nr, addrlen, addr);
597 if (unlikely(rc == RC_WOULDBLOCK)) {
598 LIBCOR_ASSERT_ERR();
599 return bzero_nr_iffinished(&nr, RC_CONNBROKEN);
601 return rc;
604 int send_connect_port_nonblock(int fd,
605 struct libcor_nonblock_resumeinfo *nr, __u64 port)
607 __u64 p = htonll(port);
608 return send_cmd(fd, nr, CD_CONNECT_PORT, (char *) &p, 8);
611 int send_connect_port(int fd, __u64 port)
613 int rc = RC_WOULDBLOCK;
614 struct libcor_nonblock_resumeinfo nr;
615 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
617 rc = send_connect_port_nonblock(fd, &nr, port);
618 if (unlikely(rc == RC_WOULDBLOCK)) {
619 LIBCOR_ASSERT_ERR();
620 return bzero_nr_iffinished(&nr, RC_CONNBROKEN);
622 return rc;
625 int send_list_neigh_nonblock(int fd, struct libcor_nonblock_resumeinfo *nr,
626 __u32 limit, __u32 offset)
628 char buf[8];
629 __u32 len = 0;
630 int rc;
632 rc = encode_len(buf, 4, limit);
633 if (unlikely(rc <= 0))
634 return RC_CONNBROKEN;
636 len += rc;
638 rc = encode_len(buf + len, 4, offset);
639 if (unlikely(rc <= 0))
640 return RC_CONNBROKEN;
642 len += rc;
644 return send_cmd(fd, nr, CD_LIST_NEIGH, buf, len);
647 int send_list_neigh(int fd, __u32 limit, __u32 offset)
649 int rc = RC_WOULDBLOCK;
650 struct libcor_nonblock_resumeinfo nr;
651 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
653 rc = send_list_neigh_nonblock(fd, &nr, limit, offset);
654 if (unlikely(rc == RC_WOULDBLOCK)) {
655 LIBCOR_ASSERT_ERR();
656 return bzero_nr_iffinished(&nr, RC_CONNBROKEN);
658 return rc;
661 static int field_present(struct listneigh_field *fields, __u32 numfields,
662 __u16 field)
664 __u64 u;
665 for(u=0;u<numfields;u++) {
666 if (fields[u].field == field)
667 return 1;
669 return 0;
672 int read_neigh_list_nonblock(int fd, struct libcor_nonblock_resumeinfo *nr,
673 void *ptr,
674 void (*init)(void *ptr, __u32 totalneighs, __u32 responserows),
675 void (*next_neigh)(void *ptr, __u32 addrlen, char *addr))
677 int rc;
679 struct libcor_nonblock_resumeinfo_neighlist *nr_nl =
680 &(nr->data.read.funcdata.neighlist);
682 if (nr->type == RESUME_TYPE_NONE) {
683 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
684 nr->type = RESUME_TYPE_READ;
685 nr->data.read.functype = RESUME_READ_FUNC_NEIGHLIST;
686 nr_nl->fd = fd;
687 nr_nl->ptr = ptr;
688 nr_nl->init = init;
689 nr_nl->next_neigh = next_neigh;
690 } else if (unlikely(nr->type != RESUME_TYPE_READ ||
691 nr->data.read.functype != RESUME_READ_FUNC_NEIGHLIST)) {
692 LIBCOR_ASSERT_ERR();
693 } else if (unlikely(nr_nl->fd != fd ||
694 nr_nl->ptr != ptr ||
695 nr_nl->init != init ||
696 nr_nl->next_neigh != next_neigh)) {
697 LIBCOR_ASSERT_ERR();
698 } else {
699 __u8 state = nr_nl->state;
700 if (state == 1) {
701 goto state_1;
702 } else if (state == 2) {
703 goto state_2;
704 } else if (state == 3) {
705 goto state_3;
706 } else if (state == 4) {
707 goto state_4;
708 } else if (state == 5) {
709 goto state_5;
710 } else if (state == 6) {
711 goto state_6;
712 } else if (state == 7) {
713 goto state_7;
714 } else if (state == 8) {
715 goto state_8;
716 } else if (state == 9) {
717 goto state_9;
718 } else if (state ==10) {
719 goto state_10;
720 } else if (state == 11) {
721 goto state_11;
722 } else if (unlikely(state != 0)) {
723 LIBCOR_ASSERT_ERR();
727 nr_nl->respcode = 0;
729 rc = read_fully(fd, nr, (char *) &(nr_nl->respcode), 1, 0);
730 if (rc != RC_OK)
731 return bzero_nr_iffinished(nr, rc);
733 if (unlikely(nr_nl->respcode != CDR_BINDATA))
734 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
736 nr_nl->state = 1;
737 state_1:
738 rc = read_len(fd, nr, &(nr_nl->len), 0);
739 if (rc != RC_OK)
740 return bzero_nr_iffinished(nr, rc);
742 nr_nl->state = 2;
743 state_2:
744 rc = read_len(fd, nr, &(nr_nl->totalneighs), &(nr_nl->len));
745 if (rc != RC_OK)
746 return bzero_nr_iffinished(nr, rc);
748 nr_nl->state = 3;
749 state_3:
750 rc = read_len(fd, nr, &(nr_nl->response_rows), &(nr_nl->len));
751 if (rc != RC_OK)
752 return bzero_nr_iffinished(nr, rc);
754 nr_nl->state = 4;
755 state_4:
756 rc = read_len(fd, nr, &(nr_nl->numfields), &(nr_nl->len));
757 if (rc != RC_OK)
758 return bzero_nr_iffinished(nr, rc);
760 if (unlikely(nr_nl->numfields > NEIGHLIST_MAX_FIELDS))
761 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
763 for(nr_nl->u=0;nr_nl->u<nr_nl->numfields;nr_nl->u++) {
764 nr_nl->fields[nr_nl->u].field = 0;
766 nr_nl->state = 5;
767 state_5:
768 rc = read_fully(fd, nr,
769 (char *) &(nr_nl->fields[nr_nl->u].field),
770 2, &(nr_nl->len));
771 if (rc != RC_OK)
772 return bzero_nr_iffinished(nr, rc);
773 nr_nl->fields[nr_nl->u].field =
774 ntohs(nr_nl->fields[nr_nl->u].field);
776 nr_nl->state = 6;
777 state_6:
778 nr_nl->fieldlen = 0;
779 rc = read_len(fd, nr, &(nr_nl->fieldlen), &(nr_nl->len));
780 if (rc != RC_OK)
781 return bzero_nr_iffinished(nr, rc);
783 if (unlikely(nr_nl->fieldlen > 65535))
784 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
786 nr_nl->fields[nr_nl->u].len = (__u16) nr_nl->fieldlen;
787 nr_nl->fieldlen = 0;
790 if (unlikely(field_present(nr_nl->fields, nr_nl->numfields,
791 LIST_NEIGH_FIELD_ADDR) == 0))
792 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
794 init(ptr, nr_nl->totalneighs, nr_nl->response_rows);
796 for (nr_nl->u=0;nr_nl->u<nr_nl->response_rows;nr_nl->u++) {
797 bzero(&(nr_nl->addr[0]), ADDR_MAX_LEN);
798 nr_nl->addrlen = 0;
799 nr_nl->latency = 0;
801 for(nr_nl->v=0;nr_nl->v<nr_nl->numfields;nr_nl->v++) {
802 nr_nl->fieldlen = nr_nl->fields[nr_nl->v].len;
803 if (nr_nl->fieldlen == 0) {
804 nr_nl->state = 7;
805 state_7:
806 rc = read_len(fd, nr, &(nr_nl->fieldlen),
807 &(nr_nl->len));
808 if (rc != RC_OK)
809 return bzero_nr_iffinished(nr, rc);
812 if (nr_nl->fieldlen > nr_nl->len)
813 return bzero_nr_iffinished(nr, RC_CONNBROKEN);
815 nr_nl->len -= nr_nl->fieldlen;
817 if (field_present(nr_nl->fields, nr_nl->v,
818 nr_nl->fields[nr_nl->v].field)) {
819 goto discard_field;
820 } else if (nr_nl->fields[nr_nl->v].field ==
821 LIST_NEIGH_FIELD_ADDR) {
822 nr_nl->state = 8;
823 state_8:
824 rc = read_len(fd, nr, &(nr_nl->addrlen),
825 &(nr_nl->fieldlen));
826 if (rc != RC_OK)
827 return bzero_nr_iffinished(nr, rc);
829 if (unlikely(nr_nl->addrlen > ADDR_MAX_LEN))
830 goto discard_field;
832 if (sizeof(nr_nl->addr) != ADDR_MAX_LEN)
833 LIBCOR_ASSERT_ERR();
835 bzero(&(nr_nl->addr[0]), ADDR_MAX_LEN);
837 nr_nl->state = 9;
838 state_9:
839 rc = read_fully(fd, nr, &(nr_nl->addr[0]),
840 nr_nl->addrlen,
841 &(nr_nl->fieldlen));
842 if (rc != RC_OK)
843 return bzero_nr_iffinished(nr, rc);
844 } else if (nr_nl->fields[nr_nl->v].field ==
845 LIST_NEIGH_FIELD_LATENCY) {
846 nr_nl->latency = 0;
848 nr_nl->state = 10;
849 state_10:
850 rc = read_fully(fd, nr, &nr_nl->latency, 1,
851 &(nr_nl->fieldlen));
852 if (rc != RC_OK)
853 return bzero_nr_iffinished(nr, rc);
855 printf("latency %d\n", (int) nr_nl->latency);
858 discard_field:
859 if (nr_nl->fieldlen > 0) {
860 nr_nl->state = 11;
861 state_11:
862 rc = read_discard(fd, nr, nr_nl->fieldlen);
863 if (rc != RC_OK)
864 return bzero_nr_iffinished(nr, rc);
867 nr_nl->fieldlen = 0;
870 if (nr_nl->addrlen > ADDR_MAX_LEN)
871 LIBCOR_ASSERT_ERR();
873 next_neigh(ptr, nr_nl->addrlen, &(nr_nl->addr[0]));
874 bzero(&(nr_nl->addr[0]), ADDR_MAX_LEN);
875 nr_nl->addrlen = 0;
876 nr_nl->latency = 0;
879 return rc;
882 int read_neigh_list(int fd, void *ptr,
883 void (*init)(void *ptr, __u32 totalneighs, __u32 responserows),
884 void (*next_neigh)(void *ptr, __u32 addrlen, char *addr))
886 int rc = RC_WOULDBLOCK;
887 struct libcor_nonblock_resumeinfo nr;
888 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
890 rc = read_neigh_list_nonblock(fd, &nr, ptr, init, next_neigh);
891 if (unlikely(rc == RC_WOULDBLOCK)) {
892 LIBCOR_ASSERT_ERR();
893 return bzero_nr_iffinished(&nr, RC_CONNBROKEN);
895 return rc;
898 int pass_socket(int fd, __u64 cookie)
900 int rc;
902 rc = setsockopt(fd, SOL_COR, COR_PASS_ON_CLOSE, &cookie, 8);
903 if (rc != 0) {
904 perror("pass_socket");
905 return RC_CONNBROKEN;
908 close(fd);
910 return RC_OK;
913 static int send_rdsock_cmd(int fd, struct libcor_nonblock_resumeinfo *nr,
914 __u32 cmd, char *data, __u32 datalen)
916 __u32 be_cmd = htonl(cmd);
917 __u32 be_datalen = htonl(datalen);
919 if (unlikely(nr->type != RESUME_TYPE_NONE))
920 LIBCOR_ASSERT_ERR();
922 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
923 nr->type = RESUME_TYPE_WRITE;
924 nr->data.write.fd = fd;
926 if (sizeof(nr->data.write.buf) != WRITE_BUF_SIZE)
927 LIBCOR_ASSERT_ERR();
929 if (unlikely(datalen + 8 < datalen || WRITE_BUF_SIZE < (datalen + 8)))
930 LIBCOR_ASSERT_ERR();
932 memcpy(&(nr->data.write.buf[0]), (char *) &be_cmd, 4);
933 memcpy(&(nr->data.write.buf[4]), (char *) &be_datalen, 4);
934 memcpy(&(nr->data.write.buf[8]), data, datalen);
935 nr->data.write.len = datalen + 9;
937 return resume_send(fd, nr);
940 int send_rdsock_connecterror_nonblock(int fd,
941 struct libcor_nonblock_resumeinfo *nr,
942 __u64 cookie, __u32 error)
944 char data[12];
946 error = htonl(error);
948 memcpy(&(data[0]), (char *) &cookie, 8);
949 memcpy(&(data[8]), (char *) &error, 4);
951 return send_rdsock_cmd(fd, nr, CRD_UTK_CONNECTERROR, &(data[0]), 12);
954 int send_rdsock_connecterror(int fd, __u64 cookie, __u32 error)
956 int rc = RC_WOULDBLOCK;
957 struct libcor_nonblock_resumeinfo nr;
958 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
960 rc = send_rdsock_connecterror_nonblock(fd, &nr, cookie, error);
961 if (unlikely(rc == RC_WOULDBLOCK)) {
962 LIBCOR_ASSERT_ERR();
963 return bzero_nr_iffinished(&nr, RC_CONNBROKEN);
965 return rc;
968 int send_rdsock_up_nonblock(int fd, struct libcor_nonblock_resumeinfo *nr,
969 char *addr, __u32 addrlen)
971 char data[72];
973 __u64 flags = 0;
975 __u32 addrlenbe;
977 if (addr == 0 && addrlen != 0)
978 return RC_CONNBROKEN;
980 if (addrlen > 64)
981 return RC_CONNBROKEN;
983 flags = htonll(flags);
984 addrlenbe = htonl(addrlen);
986 memcpy(&(data[0]), (char *) &flags, 8);
987 memcpy(&(data[8]), (char *) &addrlenbe, 4);
988 if (addrlen > 0)
989 memcpy(&(data[12]), addr, addrlen);
991 return send_rdsock_cmd(fd, nr, CRD_UTK_UP, &(data[0]), 12 + addrlen);
994 int send_rdsock_up(int fd, char *addr, __u32 addrlen)
996 int rc = RC_WOULDBLOCK;
997 struct libcor_nonblock_resumeinfo nr;
998 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
1000 rc = send_rdsock_up_nonblock(fd, &nr, addr, addrlen);
1001 if (unlikely(rc == RC_WOULDBLOCK)) {
1002 LIBCOR_ASSERT_ERR();
1003 return bzero_nr_iffinished(&nr, RC_CONNBROKEN);
1005 return rc;
1008 int parse_rdsock_connect(void *ptr, struct rdsock_cmd *cmd,
1009 int (*proc_connect)(void *ptr, __u64 cookie,
1010 struct cor_sockaddr *addr))
1012 __u64 cookie;
1013 struct cor_sockaddr addr;
1015 memcpy((char *) &cookie, cmd->cmddata, 8);
1017 if ((sizeof(struct cor_sockaddr) + 8) > cmd->cmddatalen)
1018 return 1;
1020 memcpy((char *) &addr, cmd->cmddata + 8, sizeof(struct cor_sockaddr));
1022 return proc_connect(ptr, cookie, &addr);
1025 int read_rdsock_cmd_nonblock(int fd,
1026 struct libcor_nonblock_resumeinfo *nr,
1027 struct rdsock_cmd *cmd)
1029 int rc;
1031 struct libcor_nonblock_resumeinfo_rdsockcmd *nr_rd =
1032 &(nr->data.read.funcdata.rdsock_cmd);
1034 bzero(cmd, sizeof(struct rdsock_cmd));
1036 if (nr->type == RESUME_TYPE_NONE) {
1037 bzero(nr, sizeof(struct libcor_nonblock_resumeinfo));
1038 nr->type = RESUME_TYPE_READ;
1039 nr->data.read.functype = RESUME_READ_FUNC_RDSOCK_CMD;
1040 nr_rd->fd = fd;
1041 } else if (unlikely(nr->type != RESUME_TYPE_READ ||
1042 nr->data.read.functype != RESUME_READ_FUNC_RDSOCK_CMD)){
1043 LIBCOR_ASSERT_ERR();
1044 } else if (unlikely(nr_rd->fd != fd)) {
1045 LIBCOR_ASSERT_ERR();
1046 } else {
1047 __u8 state = nr_rd->state;
1049 if (state == 1) {
1050 goto state_1;
1051 } else if (state == 2) {
1052 goto state_2;
1053 } else if (unlikely(state != 0)) {
1054 LIBCOR_ASSERT_ERR();
1058 if (sizeof(nr_rd->buf) != 8)
1059 LIBCOR_ASSERT_ERR();
1061 rc = read_fully(fd, nr, (char *) &(nr_rd->buf[0]), 8, 0);
1062 if (rc != RC_OK)
1063 return bzero_nr_iffinished(nr, rc);
1065 ((char *) &(nr_rd->cmd))[0] = nr_rd->buf[0];
1066 ((char *) &(nr_rd->cmd))[1] = nr_rd->buf[1];
1067 ((char *) &(nr_rd->cmd))[2] = nr_rd->buf[2];
1068 ((char *) &(nr_rd->cmd))[3] = nr_rd->buf[3];
1069 ((char *) &(nr_rd->cmddatalen))[0] = nr_rd->buf[4];
1070 ((char *) &(nr_rd->cmddatalen))[1] = nr_rd->buf[5];
1071 ((char *) &(nr_rd->cmddatalen))[2] = nr_rd->buf[6];
1072 ((char *) &(nr_rd->cmddatalen))[3] = nr_rd->buf[7];
1074 nr_rd->cmd = ntohl(nr_rd->cmd);
1075 nr_rd->cmddatalen = ntohl(nr_rd->cmddatalen);
1077 if (nr_rd->cmddatalen > 65536)
1078 goto discard;
1080 nr_rd->cmddata = malloc(nr_rd->cmddatalen);
1081 if (nr_rd->cmddata == 0)
1082 goto discard;
1084 nr_rd->state = 1;
1085 state_1:
1086 rc = read_fully(fd, nr, nr_rd->cmddata, nr_rd->cmddatalen, 0);
1087 if (rc != RC_OK) {
1088 if (rc != RC_WOULDBLOCK) {
1089 free(nr_rd->cmddata);
1090 nr_rd->cmddata = 0;
1092 return bzero_nr_iffinished(nr, rc);
1095 cmd->cmd = nr_rd->cmd;
1096 cmd->cmddata = nr_rd->cmddata;
1097 cmd->cmddatalen = nr_rd->cmddatalen;
1099 if (0) {
1100 discard:
1101 nr_rd->state = 2;
1102 state_2:
1103 rc = read_discard(fd, nr, nr_rd->cmddatalen);
1104 if (rc != RC_OK)
1105 return bzero_nr_iffinished(nr, rc);
1107 return bzero_nr_iffinished(nr, rc);
1110 int read_rdsock_cmd(int fd, struct rdsock_cmd *cmd)
1112 int rc = RC_WOULDBLOCK;
1113 struct libcor_nonblock_resumeinfo nr;
1114 bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo));
1116 rc = read_rdsock_cmd_nonblock(fd, &nr, cmd);
1117 if (unlikely(rc == RC_WOULDBLOCK)) {
1118 LIBCOR_ASSERT_ERR();
1119 return bzero_nr_iffinished(&nr, RC_CONNBROKEN);
1121 return rc;