From 4e3175c396358587fa3b84ed9a4470d57cab853c Mon Sep 17 00:00:00 2001 From: Michael Blizek Date: Sun, 4 May 2014 13:36:28 +0200 Subject: [PATCH] libcor: nonblocking read --- .gitignore | 15 ++ Makefile | 64 +++++ libcor.c | 855 ++++++++++++++++++++++++++++++++++++++++------------------ libcor.h | 126 ++++++++- test_route.c | 5 - test_routed.c | 5 - 6 files changed, 795 insertions(+), 275 deletions(-) create mode 100644 .gitignore create mode 100644 Makefile diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9a461d6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,15 @@ +libcor.o +test_connect +test_connect.o +test_connect2 +test_connect2.o +test_invalidconnect +test_invalidconnect.o +test_listen +test_listen.o +test_neigh +test_neigh.o +test_route +test_route.o +test_routed +test_routed.o diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d6cb6fd --- /dev/null +++ b/Makefile @@ -0,0 +1,64 @@ +#I know, this is quick and dirty... + + +all: libcor.o test_connect test_connect2 test_invalidconnect test_listen test_neigh test_route test_routed + +clean: + rm libcor.o test_connect test_connect.o test_connect2 test_connect2.o \ + test_invalidconnect test_invalidconnect.o \ + test_listen test_listen.o test_neigh test_neigh.o \ + test_route test_route.o test_routed test_routed.o + + +libcor.o: libcor.c + gcc -g -c libcor.c + + +test_connect: test_connect.o libcor.o + gcc -g -o test_connect test_connect.o libcor.o + +test_connect.o: test_connect.c libcor.h + gcc -g -c test_connect.c + + +test_connect2: test_connect2.o libcor.o + gcc -g -o test_connect2 test_connect2.o libcor.o + +test_connect2.o: test_connect2.c libcor.h + gcc -g -c test_connect2.c + + +test_invalidconnect: test_invalidconnect.o libcor.o + gcc -g -o test_invalidconnect test_invalidconnect.o libcor.o + +test_invalidconnect.o: test_invalidconnect.c libcor.h + gcc -g -c test_invalidconnect.c + + +test_listen: test_listen.o libcor.o + gcc -g -o test_listen test_listen.o libcor.o + +test_listen.o: test_listen.c libcor.h + gcc -g -c test_listen.c + + +test_neigh: test_neigh.o libcor.o + gcc -g -o test_neigh test_neigh.o libcor.o + +test_neigh.o: test_neigh.c libcor.h + gcc -g -c test_neigh.c + + +test_route: test_route.o libcor.o + gcc -g -o test_route test_route.o libcor.o + +test_route.o: test_route.c libcor.h + gcc -g -c test_route.c + + +test_routed: test_routed.o libcor.o + gcc -g -o test_routed test_routed.o libcor.o + +test_routed.o: test_routed.c libcor.h + gcc -g -c test_routed.c + diff --git a/libcor.c b/libcor.c index 2d7852b..6faa4f2 100644 --- a/libcor.c +++ b/libcor.c @@ -25,9 +25,13 @@ #include #include #include +#include #include "libcor.h" +#define likely(x) __builtin_expect((x), 1) +#define unlikely(x) __builtin_expect((x), 0) + #define CD_CONNECT_NB 1 #define CD_CONNECT_PORT 2 @@ -70,11 +74,14 @@ */ -struct listneigh_field { - __u16 field; - __u16 len; -}; - +static int bzero_nr_iffinished( + struct libcor_nonblock_resumeinfo *nr, int rc) +{ + if (rc != RC_WOULDBLOCK) { + bzero(nr, sizeof(struct libcor_nonblock_resumeinfo)); + } + return rc; +} static int send_fully(int fd, char *buf, __u32 len) { @@ -101,53 +108,168 @@ static int send_fully(int fd, char *buf, __u32 len) return RC_OK; } -static int read_fully(int fd, char *buf, __u32 len, __u32 *maxread) +static int read_fully(int fd, struct libcor_nonblock_resumeinfo *nr, + char *buf, __u32 len, __u32 *maxread) { + int rc = RC_OK; + + if (unlikely(nr->type != RESUME_TYPE_READ)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + if (unlikely(nr->data.read.read_fully.state != 0 && ( + nr->data.read.read_fully.fd != fd || + nr->data.read.read_fully.buf != buf || + nr->data.read.read_fully.len != len || + nr->data.read.read_fully.maxread != maxread))) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + if (nr->data.read.read_fully.state == 1) { + goto state_1; + } else if (unlikely(nr->data.read.read_fully.state != 0)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + nr->data.read.read_fully.fd = fd; + nr->data.read.read_fully.buf = buf; + nr->data.read.read_fully.len = len; + nr->data.read.read_fully.maxread = maxread; + nr->data.read.read_fully.totalread = 0; + + nr->data.read.read_fully.state = 1; +state_1: + if (maxread != 0) { if (len > (*maxread)) { printf("error in read_fully: maxread reached/n"); - return RC_ERROR; + rc = RC_CONNBROKEN; + goto out; } (*maxread) -= len; } - while (len > 0) { - int rcvd = recv(fd, buf, len, 0); + while (len > nr->data.read.read_fully.totalread) { + int rcvd = recv(fd, buf + nr->data.read.read_fully.totalread, + len - nr->data.read.read_fully.totalread, 0); int u; if (rcvd == 0) { printf("non-blocking read not supported"); - return RC_CONNBROKEN; + rc = RC_WOULDBLOCK; + goto out; } - if (rcvd == -1) { + if (rcvd < 0) { if (errno == EINTR) continue; perror("recv"); - return RC_CONNBROKEN; + rc = RC_CONNBROKEN; + goto out; } /*printf("rcvd: %d:", rcvd); for(u=0;udata.read.read_fully.totalread += (__u32) rcvd; + } + +out: + if (rc != RC_WOULDBLOCK) { + bzero(&(nr->data.read.read_fully), + sizeof(nr->data.read.read_fully)); } return RC_OK; } +static int read_discard(int fd, struct libcor_nonblock_resumeinfo *nr, + __u32 len) +{ + char buf[128]; + int rc = RC_OK; -int encode_len(char *buf, int buflen, __u32 len) + if (unlikely(nr->type != RESUME_TYPE_READ)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + if (unlikely(nr->data.read.read_discard.state != 0 && ( + nr->data.read.read_discard.fd != fd || + nr->data.read.read_discard.len != len))) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + if (nr->data.read.read_discard.state == 1) { + goto state_1; + } else if (unlikely(nr->data.read.read_discard.state != 0)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + nr->data.read.read_discard.fd = fd; + nr->data.read.read_discard.len = len; + nr->data.read.read_discard.discarded = 0; + + + nr->data.read.read_discard.state = 1; +state_1: + + while (len > 0) { + int rcvd; + + __u32 rcvlen = len - nr->data.read.read_discard.discarded; + if (rcvlen > 128) + rcvlen = 128; + + rcvd = recv(fd, buf, rcvlen, 0); + + if (rcvd == 0) { + rc = RC_WOULDBLOCK; + break; + } + + if (rcvd < 0) { + if (errno == EINTR) + continue; + + perror("recv"); + rc = RC_CONNBROKEN; + goto out; + } + + nr->data.read.read_discard.discarded -= rcvd; + } + +out: + if (rc != RC_WOULDBLOCK) { + bzero(&(nr->data.read.read_discard), + sizeof(nr->data.read.read_discard)); + } + + return rc; +} + +static int encode_len(char *buf, int buflen, __u32 len) { - if (buf == 0) + if (unlikely(buf == 0)) return -1; - if (buflen < 4) + if (unlikely(buflen < 4)) return -1; if (len <= 127) { @@ -175,9 +297,9 @@ int encode_len(char *buf, int buflen, __u32 len) return -1; } -int decode_len(char *buf, int buflen, __u32 *len) +static int decode_len(char *buf, int buflen, __u32 *len) { - if (buflen < 1) + if (unlikely(buflen < 1)) return 0; if ((__u8) buf[0] <= 127) { @@ -207,6 +329,89 @@ int decode_len(char *buf, int buflen, __u32 *len) return 4; } +static int read_len(int fd, struct libcor_nonblock_resumeinfo *nr, + __u32 *len, __u32 *maxread) +{ + int rc = RC_CONNBROKEN; + + if (unlikely(nr->type != RESUME_TYPE_READ)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + if (unlikely(nr->data.read.read_len.state != 0 && ( + nr->data.read.read_len.fd != fd || + nr->data.read.read_len.len != len || + nr->data.read.read_len.maxread != maxread))) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + if (sizeof(nr->data.read.read_len.buf) != 4) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + if (nr->data.read.read_len.state == 1) { + goto state_1; + } else if (unlikely(nr->data.read.read_len.state != 0)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + + nr->data.read.read_len.fd = fd; + nr->data.read.read_len.len = len; + nr->data.read.read_len.maxread = maxread; + bzero(&(nr->data.read.read_len.buf[0]), 4); + nr->data.read.read_len.read = 0; + + + while(1) { + nr->data.read.read_len.state = 1; +state_1: + if (nr->data.read.read_len.read >= 4) { + printf("error in readlen: read to high\n"); + goto out_connbroken; + } + + rc = read_fully(fd, nr, &(nr->data.read.read_len.buf[0]) + + nr->data.read.read_len.read, 1, maxread); + if (rc != RC_OK) + return rc; + + nr->data.read.read_len.read++; + + rc = decode_len(&(nr->data.read.read_len.buf[0]), + nr->data.read.read_len.read, len); + if (rc > 0) { + if (unlikely(rc < nr->data.read.read_len.read)) { + printf("error in readlen: decode_len has not " + "consumed the whole buffer\n"); + goto out_connbroken; + } + rc = RC_OK; + break; + } + } + + if (0) { +out_connbroken: + rc = RC_CONNBROKEN; + } +out: + + if (rc != RC_WOULDBLOCK) { + bzero(&(nr->data.read.read_len), + sizeof(nr->data.read.read_len)); + } + + return rc; +} + static int send_cmd_hdr(int fd, __u16 cmd, __u32 len) { char cmdlenbuf[6]; @@ -222,7 +427,7 @@ static int send_cmd_hdr(int fd, __u16 cmd, __u32 len) rc = encode_len(cmdlenbuf+2, 4, len); if (rc <= 0) - return RC_ERROR; + return RC_CONNBROKEN; rc = send_fully(fd, cmdlenbuf, 2 + rc); if (rc != RC_OK) @@ -245,94 +450,112 @@ static int send_cmd(int fd, __u16 cmd, char *buf, __u32 len) return RC_OK; } -static int read_len(int fd, __u32 *len, __u32 *maxread) +int read_resp_nonblock(int fd, struct libcor_nonblock_resumeinfo *nr) { - char buf[4]; - __u32 read = 0; - - while(1) { - int rc; + int rc; - if (read >= 4) { - printf("error in readlen: read to high"); + struct libcor_nonblock_resumeinfo_resp *nr_resp = + &(nr->data.read.funcdata.resp); + + if (nr->type == RESUME_TYPE_NONE) { + bzero(nr, sizeof(struct libcor_nonblock_resumeinfo)); + nr->type = RESUME_TYPE_READ; + nr->data.read.functype = RESUME_READ_FUNC_RESP; + nr_resp->fd = fd; + } else if (unlikely(nr->type != RESUME_TYPE_READ || + nr->data.read.functype != RESUME_READ_FUNC_RESP)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } else if (unlikely(nr_resp->fd != fd)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } else { + __u8 state = nr_resp->state; + + if (state == 1) { + goto state_1; + } else if (state == 2) { + goto state_2; + } else if (state == 3) { + goto state_3; + } else if (unlikely(state != 0)) { + assert(0); + exit(1); return RC_CONNBROKEN; } + } - rc = read_fully(fd, buf + read, 1, maxread); - if (rc != RC_OK) - return rc; - - read++; + nr_resp->respcode = 0; + rc = read_fully(fd, nr, (char *) &(nr_resp->respcode), 1, 0); - rc = decode_len(buf, read, len); - if (rc > 0) { - if (rc < read) { - printf("error in readlen: decode_len has not " - "consumed the whole buffer"); - return RC_CONNBROKEN; - } - break; - } - } + if (rc != RC_OK) + return bzero_nr_iffinished(nr, rc); - return RC_OK; -} + if (nr_resp->respcode == CDR_EXECFAILED) { + nr_resp->reasoncode = 0; -static int readprint_reason(int fd) -{ - __u16 reasoncode; - char buf[4096]; - __u32 reasonlen; + nr_resp->state = 1; +state_1: + rc = read_fully(fd, nr, (char *) &(nr_resp->reasoncode), 2, 0); + if (rc != RC_OK) + return bzero_nr_iffinished(nr, rc); - int rc = read_fully(fd, (char *) &reasoncode, 2, 0); - if (rc) - return rc; + nr_resp->reasoncode = ntohs(nr_resp->reasoncode); - reasoncode = ntohs(reasoncode); + printf("execfailed: reasoncode = %d\n", (__s32) + nr_resp->reasoncode); - printf("execfailed: reasoncode = %d\n", (__s32) reasoncode); + nr_resp->state = 2; +state_2: + rc = read_len(fd, nr, &(nr_resp->reasonlen), + 0); + if (rc != RC_OK) + return bzero_nr_iffinished(nr, rc); - rc = read_len(fd, &reasonlen, 0); - if (rc != RC_OK) - return rc; + nr_resp->state = 3; +state_3: + rc = read_discard(fd, nr, nr_resp->reasonlen); + if (rc != RC_OK) + return bzero_nr_iffinished(nr, rc); - while (reasonlen > 0) { - __u32 nextread = reasonlen; + /* + while (reasonlen > 0) { + __u32 nextread = reasonlen; - if (reasonlen > 4093) - nextread = 4093; - reasonlen -= nextread; - int rc = read_fully(fd, buf, nextread, 0); + if (reasonlen > 4093) + nextread = 4093; + reasonlen -= nextread; + int rc = read_fully(fd, buf, nextread, 0); - if (rc) - return rc; + if (rc) + return rc; - buf[nextread] = 0; - printf("%s", buf); + buf[nextread] = 0; + printf("%s", buf); + } + printf("\n");*/ } - printf("\n"); - return RC_OK; + + if (nr_resp->respcode == CDR_EXECOK) + return bzero_nr_iffinished(nr, RC_OK); + + return bzero_nr_iffinished(nr, RC_CONNBROKEN); } int read_resp(int fd) { - __u8 respcode; - - int rc = read_fully(fd, (char *) &respcode, 1, 0); - if (rc) - return rc; + int rc = RC_WOULDBLOCK; + struct libcor_nonblock_resumeinfo nr; + bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo)); - if (respcode == CDR_EXECFAILED) { - rc = readprint_reason(fd); - if (rc) - return rc; - return RC_ERROR; + rc = read_resp_nonblock(fd, &nr); + if (unlikely(rc == RC_WOULDBLOCK)) { + assert(0); + rc = RC_CONNBROKEN; } - - if (respcode == CDR_EXECOK) - return RC_OK; - - return RC_CONNBROKEN; + return rc; } @@ -344,12 +567,14 @@ int send_connect_neigh(int fd, __u32 addrlen, char *addr) int rc; rc = encode_len(buf, 4, addrlen); - if (rc <= 0) - return RC_ERROR; + if (unlikely(rc <= 0)) + return RC_CONNBROKEN; len += rc; - /* printf("send_connect_neigh %d %d %c%c\n", (int) addrtypelen, (int) addrlen, (char) addrtype[0], (char) addrtype[1]); */ + /* printf("send_connect_neigh %d %d %c%c\n", (int) addrtypelen, + * (int) addrlen, (char) addrtype[0], + * (char) addrtype[1]); */ rc = send_cmd_hdr(fd, CD_CONNECT_NB, len + addrlen); if (rc != RC_OK) @@ -379,93 +604,22 @@ int send_list_neigh(int fd, __u32 limit, __u32 offset) int rc; rc = encode_len(buf, 4, limit); - if (rc <= 0) - return RC_ERROR; + if (unlikely(rc <= 0)) + return RC_CONNBROKEN; len += rc; rc = encode_len(buf + len, 4, offset); - if (rc <= 0) - return RC_ERROR; + if (unlikely(rc <= 0)) + return RC_CONNBROKEN; len += rc; return send_cmd(fd, CD_LIST_NEIGH, buf, len); } -/* int send_set_timeout(int fd, __u32 timeout, int backwards) -{ - __u32 vals[3]; - __u16 cmd; - - if (backwards) - cmd = CD_SET_BACKWARD_TIMEOUT; - else - cmd = CD_SET_FORWARD_TIMEOUT; - - *((__u16 *)( ((char *)vals) + 2)) = htons(cmd); - *((__u32 *)( ((char *)vals) + 4)) = htonl(4); / * length * / - vals[2] = htonl(timeout); - - return send_cmd(fd, ((char *) vals) + 2, 10); -} */ - - -int read_discard(int fd, __u32 len) -{ - char buf[len]; - printf("read_discard %d\n", len); - return read_fully(fd, buf, len, 0); -} - -static int read_field(int fd, __u16 field, __u32 len, __u32 *addrlen, - char **addr, char *latency) -{ - int rc; - - if (field == LIST_NEIGH_FIELD_ADDR) { - rc = read_len(fd, addrlen, &len); - if (rc != RC_OK) - return rc; - - if (*addrlen > 65536) - goto out_discard; - - *addr = malloc(*addrlen); - - rc = read_fully(fd, *addr, *addrlen, &len); - if (rc != RC_OK) - goto out_discard; - } else if (field == LIST_NEIGH_FIELD_LATENCY) { - rc = read_fully(fd, latency, 1, &len); - if (rc != RC_OK) - goto out_discard; - - printf("latency %d\n", (int) *latency); - } - -out_discard: - if (len > 0 && rc != RC_CONNBROKEN) { - rc = read_discard(fd, len); - if (rc != RC_OK) - return RC_CONNBROKEN; - } - - return rc; -} - -int field_redundant(struct listneigh_field *fields, __u32 numfields, - __u64 fieldidx) -{ - __u64 u; - for(u=0;udata.read.funcdata.neighlist); + + if (nr->type == RESUME_TYPE_NONE) { + bzero(nr, sizeof(struct libcor_nonblock_resumeinfo)); + nr->type = RESUME_TYPE_READ; + nr->data.read.functype = RESUME_READ_FUNC_NEIGHLIST; + nr_nl->fd = fd; + nr_nl->ptr = ptr; + nr_nl->init = init; + nr_nl->next_neigh = next_neigh; + } else if (unlikely(nr->type != RESUME_TYPE_READ || + nr->data.read.functype != RESUME_READ_FUNC_NEIGHLIST)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } else if (unlikely(nr_nl->fd != fd || + nr_nl->ptr != ptr || + nr_nl->init != init || + nr_nl->next_neigh != next_neigh)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } else { + __u8 state = nr_nl->state; + if (state == 1) { + goto state_1; + } else if (state == 2) { + goto state_2; + } else if (state == 3) { + goto state_3; + } else if (state == 4) { + goto state_4; + } else if (state == 5) { + goto state_5; + } else if (state == 6) { + goto state_6; + } else if (state == 7) { + goto state_7; + } else if (state == 8) { + goto state_8; + } else if (state == 9) { + goto state_9; + } else if (state ==10) { + goto state_10; + } else if (state == 11) { + goto state_11; + } else if (unlikely(state != 0)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + } - struct listneigh_field fields[256]; + nr_nl->respcode = 0; - __u64 u; - - rc = read_fully(fd, (char *) &respcode, 1, 0); + rc = read_fully(fd, nr, (char *) &(nr_nl->respcode), 1, 0); if (rc != RC_OK) - return RC_CONNBROKEN; + return bzero_nr_iffinished(nr, rc); - if (respcode != CDR_BINDATA) - return RC_CONNBROKEN; + if (unlikely(nr_nl->respcode != CDR_BINDATA)) + return bzero_nr_iffinished(nr, RC_CONNBROKEN); - rc = read_len(fd, &len, 0); + nr_nl->state = 1; +state_1: + rc = read_len(fd, nr, &(nr_nl->len), 0); if (rc != RC_OK) - return RC_CONNBROKEN; + return bzero_nr_iffinished(nr, rc); - rc = read_len(fd, &totalneighs, &len); + nr_nl->state = 2; +state_2: + rc = read_len(fd, nr, &(nr_nl->totalneighs), &(nr_nl->len)); if (rc != RC_OK) - goto out_discard; + return bzero_nr_iffinished(nr, rc); - rc = read_len(fd, &response_rows, &len); + nr_nl->state = 3; +state_3: + rc = read_len(fd, nr, &(nr_nl->response_rows), &(nr_nl->len)); if (rc != RC_OK) - goto out_discard; + return bzero_nr_iffinished(nr, rc); - rc = read_len(fd, &numfields, &len); + nr_nl->state = 4; +state_4: + rc = read_len(fd, nr, &(nr_nl->numfields), &(nr_nl->len)); if (rc != RC_OK) - goto out_discard; + return bzero_nr_iffinished(nr, rc); - if (numfields > 256) { - rc = RC_ERROR; - goto out_discard; - } + if (unlikely(nr_nl->numfields > NEIGHLIST_MAX_FIELDS)) + return bzero_nr_iffinished(nr, RC_CONNBROKEN); - for(u=0;uu=0;nr_nl->unumfields;nr_nl->u++) { + nr_nl->fields[nr_nl->u].field = 0; - rc = read_fully(fd, (char *) &(fields[u].field), 2, &len); + nr_nl->state = 5; +state_5: + rc = read_fully(fd, nr, + (char *) &(nr_nl->fields[nr_nl->u].field), + 2, &(nr_nl->len)); if (rc != RC_OK) - goto out_discard; - fields[u].field = ntohs(fields[u].field); - - rc = read_len(fd, &fieldlen, &len); + return bzero_nr_iffinished(nr, rc); + nr_nl->fields[nr_nl->u].field = + ntohs(nr_nl->fields[nr_nl->u].field); + + nr_nl->state = 6; +state_6: + nr_nl->fieldlen = 0; + rc = read_len(fd, nr, &(nr_nl->fieldlen), &(nr_nl->len)); if (rc != RC_OK) - goto out_discard; + return bzero_nr_iffinished(nr, rc); - if (fieldlen > 65535) - goto out_discard; + if (unlikely(nr_nl->fieldlen > 65535)) + return bzero_nr_iffinished(nr, RC_CONNBROKEN); - fields[u].len = (__u16) fieldlen; + nr_nl->fields[nr_nl->u].len = (__u16) nr_nl->fieldlen; + nr_nl->fieldlen = 0; } - if (field_present(fields, numfields, LIST_NEIGH_FIELD_ADDR) == 0) { - rc = RC_ERROR; - goto out_discard; - } + if (unlikely(field_present(nr_nl->fields, nr_nl->numfields, + LIST_NEIGH_FIELD_ADDR) == 0)) + return bzero_nr_iffinished(nr, RC_CONNBROKEN); - init(ptr, totalneighs, response_rows); + init(ptr, nr_nl->totalneighs, nr_nl->response_rows); - for (u=0;uu=0;nr_nl->uresponse_rows;nr_nl->u++) { + bzero(&(nr_nl->addr[0]), ADDR_MAX_LEN); + nr_nl->addrlen = 0; + nr_nl->latency = 0; - __u64 v; - for(v=0;vv=0;nr_nl->vnumfields;nr_nl->v++) { + nr_nl->fieldlen = nr_nl->fields[nr_nl->v].len; + if (nr_nl->fieldlen == 0) { + nr_nl->state = 7; +state_7: + rc = read_len(fd, nr, &(nr_nl->fieldlen), + &(nr_nl->len)); if (rc != RC_OK) - return rc; + return bzero_nr_iffinished(nr, rc); } - if (fieldlen > len) { - rc = RC_ERROR; - goto out_discard; - } + if (nr_nl->fieldlen > nr_nl->len) + return bzero_nr_iffinished(nr, RC_CONNBROKEN); - if (field_redundant(fields, numfields, v)) { - rc = read_discard(fd, fieldlen); + nr_nl->len -= nr_nl->fieldlen; + + if (field_present(nr_nl->fields, nr_nl->v, + nr_nl->fields[nr_nl->v].field)) { + goto discard_field; + } else if (nr_nl->fields[nr_nl->v].field == + LIST_NEIGH_FIELD_ADDR) { + nr_nl->state = 8; +state_8: + rc = read_len(fd, nr, &(nr_nl->addrlen), + &(nr_nl->fieldlen)); if (rc != RC_OK) + return bzero_nr_iffinished(nr, rc); + + if (unlikely(nr_nl->addrlen > ADDR_MAX_LEN)) + goto discard_field; + + if (sizeof(nr_nl->addr) != ADDR_MAX_LEN) { + assert(0); + exit(1); return RC_CONNBROKEN; - } else { - read_field(fd, fields[v].field, fieldlen, - &addrlen, &addr, &latency); + } + bzero(&(nr_nl->addr[0]), ADDR_MAX_LEN); + + nr_nl->state = 9; +state_9: + rc = read_fully(fd, nr, &(nr_nl->addr[0]), + nr_nl->addrlen, + &(nr_nl->fieldlen)); + if (rc != RC_OK) + return bzero_nr_iffinished(nr, rc); + } else if (nr_nl->fields[nr_nl->v].field == + LIST_NEIGH_FIELD_LATENCY) { + nr_nl->latency = 0; + + nr_nl->state = 10; +state_10: + rc = read_fully(fd, nr, &nr_nl->latency, 1, + &(nr_nl->fieldlen)); + if (rc != RC_OK) + return bzero_nr_iffinished(nr, rc); + + printf("latency %d\n", (int) nr_nl->latency); } - len -= fieldlen; - } +discard_field: + if (nr_nl->fieldlen > 0) { + nr_nl->state = 11; +state_11: + rc = read_discard(fd, nr, nr_nl->fieldlen); + if (rc != RC_OK) + return bzero_nr_iffinished(nr, rc); + } - next_neigh(ptr, addrlen, addr); - free(addr); - } + nr_nl->fieldlen = 0; + } -out_discard: - if (len > 0 && rc != RC_CONNBROKEN) { - int rc2 = read_discard(fd, len); - if (rc2 != RC_OK) + if (nr_nl->addrlen > ADDR_MAX_LEN) { + assert(0); + exit(1); return RC_CONNBROKEN; + } + next_neigh(ptr, nr_nl->addrlen, &(nr_nl->addr[0])); + bzero(&(nr_nl->addr[0]), ADDR_MAX_LEN); + nr_nl->addrlen = 0; + nr_nl->latency = 0; } return rc; } +int read_neigh_list(int fd, void *ptr, + void (*init)(void *ptr, __u32 totalneighs, __u32 responserows), + void (*next_neigh)(void *ptr, __u32 addrlen, char *addr)) +{ + int rc = RC_WOULDBLOCK; + struct libcor_nonblock_resumeinfo nr; + bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo)); + + rc = read_neigh_list_nonblock(fd, &nr, ptr, init, next_neigh); + if (unlikely(rc == RC_WOULDBLOCK)) { + assert(0); + rc = RC_CONNBROKEN; + } + return rc; +} + int pass_socket(int fd, __u64 cookie) { int rc; @@ -648,10 +924,10 @@ int send_rdsock_up(int fd, char *addr, __u32 addrlen) __u32 addrlenbe; if (addr == 0 && addrlen != 0) - return RC_ERROR; + return RC_CONNBROKEN; if (addrlen > 64) - return RC_ERROR; + return RC_CONNBROKEN; flags = htonll(flags); addrlenbe = htonl(addrlen); @@ -667,9 +943,6 @@ int send_rdsock_up(int fd, char *addr, __u32 addrlen) static int proc_rdsock_connect(int fd, void *ptr, __u32 cmd, char *cmddata, __u32 cmdlen, struct rdsock_opts *opts) { - if (opts->connect == 0) - return; - __u64 cookie; struct cor_sockaddr addr; @@ -699,55 +972,111 @@ static int proc_rdsock_cmd(int fd, void *ptr, __u32 cmd, char *cmddata, } } -int read_rdsock_cmd(int fd, void *ptr, struct rdsock_opts *opts) +#warning todo +static int read_rdsock_cmd_nonblock(int fd, + struct libcor_nonblock_resumeinfo *nr, + void *ptr, struct rdsock_opts *opts) { int rc; - char buf[8]; + struct libcor_nonblock_resumeinfo_rdsockcmd *nr_rd = + &(nr->data.read.funcdata.rdsock_cmd); + + if (nr->type == RESUME_TYPE_NONE) { + bzero(nr, sizeof(struct libcor_nonblock_resumeinfo)); + nr->type = RESUME_TYPE_READ; + nr->data.read.functype = RESUME_READ_FUNC_RDSOCK_CMD; + nr_rd->fd = fd; + nr_rd->ptr = ptr; + nr_rd->opts = opts; + } else if (unlikely(nr->type != RESUME_TYPE_READ || + nr->data.read.functype != RESUME_READ_FUNC_RDSOCK_CMD)){ + assert(0); + exit(1); + return RC_CONNBROKEN; + } else if (unlikely(nr_rd->fd != fd || nr_rd->ptr != ptr || + nr_rd->opts != opts)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } else { + __u8 state = nr_rd->state; + + if (state == 1) { + goto state_1; + } else if (state == 2) { + goto state_2; + } else if (unlikely(state != 0)) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } + } - __u32 cmd; - __u32 cmddatalen; - char *cmddata; + if (sizeof(nr_rd->buf) != 8) { + assert(0); + exit(1); + return RC_CONNBROKEN; + } - rc = read_fully(fd, (char *) &buf, 8, 0); + rc = read_fully(fd, nr, (char *) &(nr_rd->buf[0]), 8, 0); if (rc != RC_OK) return RC_CONNBROKEN; - ((char *) &cmd)[0] = buf[0]; - ((char *) &cmd)[1] = buf[1]; - ((char *) &cmd)[2] = buf[2]; - ((char *) &cmd)[3] = buf[3]; - ((char *) &cmddatalen)[0] = buf[4]; - ((char *) &cmddatalen)[1] = buf[5]; - ((char *) &cmddatalen)[2] = buf[6]; - ((char *) &cmddatalen)[3] = buf[7]; + ((char *) &(nr_rd->cmd))[0] = nr_rd->buf[0]; + ((char *) &(nr_rd->cmd))[1] = nr_rd->buf[1]; + ((char *) &(nr_rd->cmd))[2] = nr_rd->buf[2]; + ((char *) &(nr_rd->cmd))[3] = nr_rd->buf[3]; + ((char *) &(nr_rd->cmddatalen))[0] = nr_rd->buf[4]; + ((char *) &(nr_rd->cmddatalen))[1] = nr_rd->buf[5]; + ((char *) &(nr_rd->cmddatalen))[2] = nr_rd->buf[6]; + ((char *) &(nr_rd->cmddatalen))[3] = nr_rd->buf[7]; - cmd = ntohl(cmd); - cmddatalen = ntohl(cmddatalen); + nr_rd->cmd = ntohl(nr_rd->cmd); + nr_rd->cmddatalen = ntohl(nr_rd->cmddatalen); - if (cmddatalen > 65536) + if (nr_rd->cmddatalen > 65536) goto discard; - cmddata = malloc(cmddatalen); - if (cmddata == 0) + nr_rd->cmddata = malloc(nr_rd->cmddatalen); + if (nr_rd->cmddata == 0) goto discard; - rc = read_fully(fd, cmddata, cmddatalen, 0); + nr_rd->state = 1; +state_1: + rc = read_fully(fd, nr, nr_rd->cmddata, nr_rd->cmddatalen, 0); if (rc != RC_OK) { rc = RC_CONNBROKEN; goto out; } - proc_rdsock_cmd(fd, ptr, cmd, cmddata, cmddatalen, opts); + proc_rdsock_cmd(fd, ptr, nr_rd->cmd, nr_rd->cmddata, nr_rd->cmddatalen, + opts); out: - free(cmddata); + free(nr_rd->cmddata); if (0) { discard: - rc = read_discard(fd, cmddatalen); + nr_rd->state = 2; +state_2: + rc = read_discard(fd, nr, nr_rd->cmddatalen); if (rc != RC_OK) rc = RC_CONNBROKEN; } return rc; } + +int read_rdsock_cmd(int fd, void *ptr, struct rdsock_opts *opts) +{ + int rc = RC_WOULDBLOCK; + struct libcor_nonblock_resumeinfo nr; + bzero(&nr, sizeof(struct libcor_nonblock_resumeinfo)); + + rc = read_rdsock_cmd_nonblock(fd, &nr, ptr, opts); + if (unlikely(rc == RC_WOULDBLOCK)) { + assert(0); + rc = RC_CONNBROKEN; + } + return rc; +} diff --git a/libcor.h b/libcor.h index 76570ea..2927396 100644 --- a/libcor.h +++ b/libcor.h @@ -103,9 +103,131 @@ static inline __u64 ntohll(__u64 in) return out; } +#define RESUME_TYPE_NONE 0 +#define RESUME_TYPE_READ 1 +#define RESUME_TYPE_WRITE 2 + +#define RESUME_READ_FUNC_NONE 0 +#define RESUME_READ_FUNC_RESP 1 +#define RESUME_READ_FUNC_NEIGHLIST 2 +#define RESUME_READ_FUNC_RDSOCK_CMD 3 + +#define NEIGHLIST_MAX_FIELDS 256 +#define ADDR_MAX_LEN 1024 + +struct listneigh_field { + __u16 field; + __u16 len; +}; + +struct libcor_nonblock_resumeinfo_resp{ + int fd; + + __u8 state; + __u8 respcode; + + __u16 reasoncode; + __u32 reasonlen; +}; + +struct libcor_nonblock_resumeinfo_neighlist{ + int fd; + + void *ptr; + void (*init)(void *ptr, __u32 totalneighs, __u32 responserows); + void (*next_neigh)(void *ptr, __u32 addrlen, char *addr); + + + __u8 state; + + __u8 respcode; + __u32 len; + __u32 totalneighs; + __u32 response_rows; + __u32 numfields; + + struct listneigh_field fields[NEIGHLIST_MAX_FIELDS]; + + __u64 u; + __u64 v; + + __u32 fieldlen; + + char addr[ADDR_MAX_LEN]; + __u32 addrlen; + char latency; +}; + +struct libcor_nonblock_resumeinfo_rdsockcmd{ + int fd; + void *ptr; + struct rdsock_opts *opts; + + __u8 state; + + char buf[8]; + + __u32 cmd; + __u32 cmddatalen; + char *cmddata; +}; + +struct libcor_nonblock_resumeinfo{ + __u8 type; + + union{ + struct{ + struct{ + __u8 state; + + int fd; + char *buf; + __u32 len; + __u32 *maxread; + + __u32 totalread; + }read_fully; + + struct{ + __u8 state; + + int fd; + __u32 len; + __u32 discarded; + }read_discard; + + struct{ + __u8 state; + + int fd; + __u32 *len; + __u32 *maxread; + + char buf[4]; + __u32 read; + }read_len; + + __u8 functype; + union{ + struct libcor_nonblock_resumeinfo_resp resp; + + struct libcor_nonblock_resumeinfo_neighlist + neighlist; + + struct libcor_nonblock_resumeinfo_rdsockcmd + rdsock_cmd; + }funcdata; + }read; + + struct{ + + }write; + }data; +}; + #define RC_OK 0 -#define RC_ERROR 1 -#define RC_CONNBROKEN 2 +#define RC_CONNBROKEN 1 +#define RC_WOULDBLOCK 2 extern int read_resp(int fd); diff --git a/test_route.c b/test_route.c index 92dbb0d..a8e6506 100644 --- a/test_route.c +++ b/test_route.c @@ -54,11 +54,6 @@ struct node{ __u8 neighs_queried; }; -struct{ - struct node **nodes; - __u32 numnodes; -}node_list; - #define ROUTE_MAXDEPTH 7 struct e2e_route{ diff --git a/test_routed.c b/test_routed.c index 18843be..6860de3 100644 --- a/test_routed.c +++ b/test_routed.c @@ -59,11 +59,6 @@ struct node{ __u8 neighs_queried; }; -struct{ - struct node **nodes; - __u32 numnodes; -}node_list; - #define ROUTE_MAXDEPTH 7 struct e2e_route{ -- 2.11.4.GIT