nmdb: Add LevelDB support
[nmdb.git] / nmdb / tipc.c
blob2a21351a5d09cae4012f3897c2a7ee429261a268
2 #include <sys/types.h> /* socket defines */
3 #include <sys/socket.h> /* socket functions */
4 #include <stdlib.h> /* malloc() */
5 #include <linux/tipc.h> /* tipc stuff */
6 #include <stdint.h> /* uint32_t and friends */
7 #include <arpa/inet.h> /* htonls() and friends */
8 #include <string.h> /* memcpy() */
9 #include <unistd.h> /* close() */
11 #include "tipc.h"
12 #include "common.h"
13 #include "net-const.h"
14 #include "req.h"
15 #include "parse.h"
16 #include "log.h"
20 * Miscelaneous helper functions
23 static void rep_send_error(const struct req_info *req, const unsigned int code)
25 int r, c;
26 unsigned char minibuf[3 * 4];
28 if (settings.passive)
29 return;
31 /* Network format: ID (4), REP_ERR (4), error code (4) */
32 r = htonl(REP_ERR);
33 c = htonl(code);
34 memcpy(minibuf, &(req->id), 4);
35 memcpy(minibuf + 4, &r, 4);
36 memcpy(minibuf + 8, &c, 4);
38 /* If this send fails, there's nothing to be done */
39 r = sendto(req->fd, minibuf, 3 * 4, 0, req->clisa, req->clilen);
41 if (r < 0) {
42 errlog("rep_send_error() failed");
47 static int rep_send(const struct req_info *req, const unsigned char *buf,
48 const size_t size)
50 int rv;
52 if (settings.passive)
53 return 1;
55 rv = sendto(req->fd, buf, size, 0, req->clisa, req->clilen);
56 if (rv < 0) {
57 rep_send_error(req, ERR_SEND);
58 return 0;
60 return 1;
64 /* Send small replies, consisting in only a value. */
65 static void tipc_reply_mini(const struct req_info *req, uint32_t reply)
67 /* We use a mini buffer to speedup the small replies, to avoid the
68 * malloc() overhead. */
69 unsigned char minibuf[8];
71 if (settings.passive)
72 return;
74 reply = htonl(reply);
75 memcpy(minibuf, &(req->id), 4);
76 memcpy(minibuf + 4, &reply, 4);
77 rep_send(req, minibuf, 8);
78 return;
82 /* The tipc_reply_* functions are used by the db code to send the network
83 * replies. */
85 static void tipc_reply_err(const struct req_info *req, uint32_t reply)
87 rep_send_error(req, reply);
90 static void tipc_reply_long(const struct req_info *req, uint32_t reply,
91 unsigned char *val, size_t vsize)
93 if (val == NULL) {
94 /* miss */
95 tipc_reply_mini(req, reply);
96 } else {
97 unsigned char *buf;
98 size_t bsize;
99 uint32_t t;
101 reply = htonl(reply);
103 /* The reply length is:
104 * 4 id
105 * 4 reply code
106 * 4 vsize
107 * vsize val
109 bsize = 4 + 4 + 4 + vsize;
110 buf = malloc(bsize);
112 t = htonl(vsize);
114 memcpy(buf, &(req->id), 4);
115 memcpy(buf + 4, &reply, 4);
116 memcpy(buf + 8, &t, 4);
117 memcpy(buf + 12, val, vsize);
119 rep_send(req, buf, bsize);
120 free(buf);
122 return;
128 * Main functions for receiving and parsing
131 int tipc_init(void)
133 int fd, rv;
134 struct sockaddr_tipc srvsa;
136 srvsa.family = AF_TIPC;
137 if (settings.tipc_lower == settings.tipc_upper)
138 srvsa.addrtype = TIPC_ADDR_NAME;
139 else
140 srvsa.addrtype = TIPC_ADDR_NAMESEQ;
142 srvsa.addr.nameseq.type = TIPC_SERVER_TYPE;
143 srvsa.addr.nameseq.lower = settings.tipc_lower;
144 srvsa.addr.nameseq.upper = settings.tipc_upper;
145 srvsa.scope = TIPC_CLUSTER_SCOPE;
147 fd = socket(AF_TIPC, SOCK_RDM, 0);
148 if (fd < 0)
149 return -1;
151 rv = bind(fd, (struct sockaddr *) &srvsa, sizeof(srvsa));
152 if (rv < 0) {
153 close(fd);
154 return -1;
157 return fd;
161 void tipc_close(int fd)
163 close(fd);
167 /* Static common buffer to avoid unnecessary allocations.
168 * Originally, this was malloc()ed, but making it static made it go from 27
169 * usec for each set operation, to 23 usec: it made test1 go from 3.213s to
170 * 2.345s for 37618 operations.
171 * Allocate enough to hold the max msg length of 64kbytes. */
172 #define SBSIZE (68 * 1024)
173 static unsigned char static_buf[SBSIZE];
175 /* Called by libevent for each receive event */
176 void tipc_recv(int fd, short event, void *arg)
178 int rv;
179 struct req_info req;
180 struct sockaddr_tipc clisa;
181 socklen_t clilen;
183 clilen = sizeof(clisa);
185 rv = recvfrom(fd, static_buf, SBSIZE, 0, (struct sockaddr *) &clisa,
186 &clilen);
187 if (rv <= 0) {
188 /* rv == 0 means "return of an undeliverable message", which
189 * we ignore; -1 means other error. */
190 goto exit;
193 if (rv < 8) {
194 stats.net_broken_req++;
195 goto exit;
198 stats.msg_tipc++;
200 req.fd = fd;
201 req.type = REQTYPE_TIPC;
202 req.clisa = (struct sockaddr *) &clisa;
203 req.clilen = clilen;
204 req.reply_mini = tipc_reply_mini;
205 req.reply_err = tipc_reply_err;
206 req.reply_long = tipc_reply_long;
208 /* parse the message */
209 parse_message(&req, static_buf, rv);
211 exit:
212 return;