engine: fixed args
[transsip-mirror.git] / src / engine.c
blob32e4b05a59c38d1207bb668107bf5d2a75a2de22
1 /*
2 * transsip - the telephony network
3 * By Daniel Borkmann <daniel@transsip.org>
4 * Copyright 2011, 2012 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
5 * Swiss federal institute of technology (ETH Zurich)
6 * Subject to the GPL, version 2.
7 */
9 #include <sys/types.h>
10 #include <sys/stat.h>
11 #include <fcntl.h>
12 #include <pthread.h>
13 #include <string.h>
14 #include <signal.h>
15 #include <unistd.h>
16 #include <celt/celt.h>
17 #include <speex/speex_jitter.h>
18 #include <speex/speex_echo.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
21 #include <netdb.h>
22 #include <assert.h>
24 #include "built-in.h"
25 #include "alsa.h"
26 #include "die.h"
27 #include "xmalloc.h"
28 #include "xutils.h"
29 #include "call-notifier.h"
31 #define SAMPLING_RATE 48000
32 #define FRAME_SIZE 256
33 #define PACKETSIZE 43
34 #define CHANNELS 1
35 #define MAX_MSG 1500
36 #define PATH_MAX 512
38 struct transsip_hdr {
39 uint32_t seq;
40 __extension__ uint8_t est:1,
41 psh:1,
42 bsy:1,
43 fin:1,
44 res1:4;
45 } __attribute__((packed));
47 enum engine_state_num {
48 ENGINE_STATE_IDLE = CALL_STATE_MACHINE_IDLE,
49 ENGINE_STATE_CALLOUT = CALL_STATE_MACHINE_CALLOUT,
50 ENGINE_STATE_CALLIN = CALL_STATE_MACHINE_CALLIN,
51 ENGINE_STATE_SPEAKING = CALL_STATE_MACHINE_SPEAKING,
52 __ENGINE_STATE_MAX,
55 enum engine_sound_type {
56 ENGINE_SOUND_DIAL = 0,
57 ENGINE_SOUND_RING,
58 ENGINE_SOUND_BUSY,
61 struct engine_state {
62 volatile enum engine_state_num state;
63 enum engine_state_num (*process)(int, int *, int, int,
64 struct alsa_dev *);
67 #define STATE_MAP_SET(s, f) { \
68 .state = (s), \
69 .process = (f) \
72 extern sig_atomic_t quit;
74 sig_atomic_t stun_done = 0;
76 static char *alsadev = "plughw:0,0"; //XXX
77 static char *port = "30111"; //XXX
79 struct engine_curr {
80 int active;
81 int sock;
82 struct sockaddr addr;
83 socklen_t addrlen;
86 static struct engine_curr ecurr;
88 static void engine_play_file(struct alsa_dev *dev, enum engine_sound_type type)
90 int fd, nfds;
91 char path[PATH_MAX];
92 short pcm[FRAME_SIZE * CHANNELS];
93 struct pollfd *pfds = NULL;
95 memset(path, 0, sizeof(path));
96 switch (type) {
97 case ENGINE_SOUND_DIAL:
98 slprintf(path, sizeof(path), "%s/%s", FILE_ETCDIR, FILE_DIAL);
99 break;
100 case ENGINE_SOUND_BUSY:
101 slprintf(path, sizeof(path), "%s/%s", FILE_ETCDIR, FILE_BUSY);
102 break;
103 case ENGINE_SOUND_RING:
104 slprintf(path, sizeof(path), "%s/%s", FILE_ETCDIR, FILE_RING);
105 break;
108 fd = open(path, O_RDONLY);
109 if (fd < 0)
110 panic("Cannot open ring file!\n");
112 alsa_start(dev);
114 nfds = alsa_nfds(dev);
115 pfds = xmalloc(sizeof(*pfds) * nfds);
117 alsa_getfds(dev, pfds, nfds);
119 memset(pcm, 0, sizeof(pcm));
120 while (read(fd, pcm, sizeof(pcm)) > 0) {
121 poll(pfds, nfds, -1);
123 if (alsa_play_ready(dev, pfds, nfds))
124 alsa_write(dev, pcm, FRAME_SIZE);
125 memset(pcm, 0, sizeof(pcm));
127 if (alsa_cap_ready(dev, pfds, nfds))
128 alsa_read(dev, pcm, FRAME_SIZE);
129 memset(pcm, 0, sizeof(pcm));
132 alsa_stop(dev);
133 close(fd);
134 xfree(pfds);
137 static inline void engine_play_ring(struct alsa_dev *dev)
139 engine_play_file(dev, ENGINE_SOUND_RING);
142 static inline void engine_play_busy(struct alsa_dev *dev)
144 engine_play_file(dev, ENGINE_SOUND_BUSY);
147 static inline void engine_play_dial(struct alsa_dev *dev)
149 engine_play_file(dev, ENGINE_SOUND_DIAL);
152 static void engine_decode_packet(uint8_t *pkt, size_t len)
154 struct transsip_hdr *hdr;
156 if (len < sizeof(*hdr)) {
157 whine("[dbg] pkt too small!\n");
158 return;
161 hdr = (struct transsip_hdr *) pkt;
162 whine("[dbg] packet:\n");
163 whine("[dbg] seq: %d\n", hdr->seq);
164 whine("[dbg] est: %d\n", hdr->est);
165 whine("[dbg] psh: %d\n", hdr->psh);
166 whine("[dbg] bsy: %d\n", hdr->bsy);
167 whine("[dbg] fin: %d\n", hdr->fin);
168 whine("[dbg] res1: %d\n", hdr->res1);
171 static enum engine_state_num engine_do_callout(int ssock, int *csock, int usocki,
172 int usocko, struct alsa_dev *dev)
174 int one, mtu, tries, i;
175 ssize_t ret;
176 struct cli_pkt cpkt;
177 struct addrinfo hints, *ahead, *ai;
178 char msg[MAX_MSG];
179 struct pollfd fds[2];
180 struct sockaddr raddr;
181 socklen_t raddrlen;
182 struct transsip_hdr *thdr;
184 assert(ecurr.active == 0);
186 memset(&cpkt, 0, sizeof(cpkt));
187 ret = read(usocki, &cpkt, sizeof(cpkt));
188 if (ret != sizeof(cpkt)) {
189 whine("Read error from cli!\n");
190 return ENGINE_STATE_IDLE;
192 if (cpkt.ring == 0) {
193 whine("Read no ring flag from cli!\n");
194 return ENGINE_STATE_IDLE;
197 memset(&hints, 0, sizeof(hints));
198 hints.ai_family = PF_UNSPEC;
199 hints.ai_socktype = SOCK_DGRAM;
200 hints.ai_protocol = IPPROTO_UDP;
201 hints.ai_flags = AI_NUMERICSERV;
203 ret = getaddrinfo(cpkt.address, cpkt.port, &hints, &ahead);
204 if (ret < 0) {
205 whine("Cannot get address info for %s:%s!\n",
206 cpkt.address, cpkt.port);
207 return ENGINE_STATE_IDLE;
210 *csock = -1;
212 for (ai = ahead; ai != NULL && *csock < 0; ai = ai->ai_next) {
213 *csock = socket(ai->ai_family, ai->ai_socktype,
214 ai->ai_protocol);
215 if (*csock < 0)
216 continue;
218 ret = connect(*csock, ai->ai_addr, ai->ai_addrlen);
219 if (ret < 0) {
220 whine("Cannot connect to remote!\n");
221 close(*csock);
222 *csock = -1;
223 continue;
226 one = 1;
227 setsockopt(*csock, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
229 mtu = IP_PMTUDISC_DONT;
230 setsockopt(*csock, SOL_IP, IP_MTU_DISCOVER, &mtu, sizeof(mtu));
232 memcpy(&ecurr.addr, ai->ai_addr, ai->ai_addrlen);
233 ecurr.addrlen = ai->ai_addrlen;
234 ecurr.sock = *csock;
235 ecurr.active = 0;
237 freeaddrinfo(ahead);
239 if (*csock < 0) {
240 whine("Cannot connect to server!\n");
241 return ENGINE_STATE_IDLE;
244 tries = 0;
246 memset(msg, 0, sizeof(msg));
247 thdr = (struct transsip_hdr *) msg;
248 thdr->est = 1;
250 ret = sendto(*csock, msg, sizeof(*thdr), 0, &ecurr.addr,
251 ecurr.addrlen);
252 if (ret <= 0) {
253 whine("Cannot send ring probe to server!\n");
254 goto out_err;
257 fds[0].fd = *csock;
258 fds[0].events = POLLIN;
259 fds[1].fd = usocki;
260 fds[1].events = POLLIN;
262 while (!quit && tries++ < 100) {
263 poll(fds, array_size(fds), 1500);
265 for (i = 0; i < array_size(fds); ++i) {
266 if ((fds[i].revents & POLLERR) == POLLERR) {
267 printf("Destination unreachable?\n");
268 goto out_err;
270 if ((fds[i].revents & POLLIN) != POLLIN)
271 continue;
273 if (fds[i].fd == usocki) {
274 ret = read(usocki, &cpkt, sizeof(cpkt));
275 if (ret <= 0) {
276 whine("Read error from cli!\n");
277 continue;
279 if (cpkt.fin) {
280 whine("User aborted call!\n");
281 goto out_err;
285 if (fds[i].fd == *csock) {
286 memset(msg, 0, sizeof(msg));
287 raddrlen = sizeof(raddr);
288 ret = recvfrom(*csock, msg, sizeof(msg), 0,
289 &raddr, &raddrlen);
290 if (ret <= 0)
291 continue;
293 engine_decode_packet((uint8_t *) msg, ret);
295 if (raddrlen != ecurr.addrlen)
296 continue;
297 if (memcmp(&raddr, &ecurr.addr, raddrlen))
298 continue;
300 thdr = (struct transsip_hdr *) msg;
301 if (thdr->est == 1 && thdr->psh == 1) {
302 ecurr.active = 1;
303 whine("Call established!\n");
304 return ENGINE_STATE_SPEAKING;
306 if (thdr->bsy == 1) {
307 whine("Remote end busy!\n");
308 engine_play_busy(dev);
309 engine_play_busy(dev);
310 goto out_err;
315 engine_play_dial(dev);
318 out_err:
319 close(*csock);
320 *csock = 0;
321 return ENGINE_STATE_IDLE;
324 static enum engine_state_num engine_do_callin(int ssock, int *csock, int usocki,
325 int usocko, struct alsa_dev *dev)
327 int i;
328 ssize_t ret;
329 char msg[MAX_MSG];
330 struct sockaddr raddr;
331 socklen_t raddrlen;
332 struct transsip_hdr *thdr;
333 char hbuff[256], sbuff[256];
334 struct pollfd fds[2];
335 struct cli_pkt cpkt;
337 assert(ecurr.active == 0);
339 memset(&msg, 0, sizeof(msg));
340 raddrlen = sizeof(raddr);
341 ret = recvfrom(ssock, msg, sizeof(msg), 0, &raddr, &raddrlen);
342 if (ret <= 0) {
343 whine("Receive error %s!\n", strerror(errno));
344 return ENGINE_STATE_IDLE;
347 thdr = (struct transsip_hdr *) msg;
348 if (thdr->est != 1)
349 return ENGINE_STATE_IDLE;
351 memcpy(&ecurr.addr, &raddr, raddrlen);
352 ecurr.addrlen = raddrlen;
353 ecurr.sock = ssock;
354 ecurr.active = 0;
356 memset(hbuff, 0, sizeof(hbuff));
357 memset(sbuff, 0, sizeof(sbuff));
358 getnameinfo((struct sockaddr *) &raddr, raddrlen, hbuff, sizeof(hbuff),
359 sbuff, sizeof(sbuff), NI_NUMERICHOST | NI_NUMERICSERV);
361 printf("New incoming connection from %s:%s!\n", hbuff, sbuff);
362 printf("Answer it with: take\n");
363 printf("Reject it with: hangup\n");
364 fflush(stdout);
366 fds[0].fd = ssock;
367 fds[0].events = POLLIN;
368 fds[1].fd = usocki;
369 fds[1].events = POLLIN;
371 while (likely(!quit)) {
372 poll(fds, array_size(fds), 1500);
374 for (i = 0; i < array_size(fds); ++i) {
375 if ((fds[i].revents & POLLIN) != POLLIN)
376 continue;
378 if (fds[i].fd == ssock) {
379 memset(msg, 0, sizeof(msg));
380 raddrlen = sizeof(raddr);
381 ret = recvfrom(ssock, msg, sizeof(msg), 0,
382 &raddr, &raddrlen);
383 if (ret <= 0)
384 continue;
385 if (raddrlen != ecurr.addrlen)
386 continue;
387 if (memcmp(&raddr, &ecurr.addr, raddrlen))
388 continue;
390 if (thdr->fin == 1) {
391 whine("Remote end hung up!\n");
392 engine_play_busy(dev);
393 engine_play_busy(dev);
394 goto out_err;
399 if (fds[i].fd == usocki) {
400 ret = read(usocki, &cpkt, sizeof(cpkt));
401 if (ret <= 0) {
402 whine("Error reading from cli!\n");
403 continue;
405 if (cpkt.fin) {
406 memset(&msg, 0, sizeof(msg));
407 thdr = (struct transsip_hdr *) msg;
408 thdr->bsy = 1;
410 sendto(ssock, msg, sizeof(*thdr), 0,
411 &ecurr.addr, ecurr.addrlen);
413 whine("You aborted call!\n");
414 goto out_err;
416 if (cpkt.take) {
417 memset(&msg, 0, sizeof(msg));
418 thdr = (struct transsip_hdr *) msg;
419 thdr->est = 1;
420 thdr->psh = 1;
422 ret = sendto(ssock, msg, sizeof(*thdr), 0,
423 &ecurr.addr, ecurr.addrlen);
424 if (ret <= 0) {
425 whine("Error sending ack!\n");
426 goto out_err;
429 ecurr.active = 1;
430 whine("Call established!\n");
431 return ENGINE_STATE_SPEAKING;
436 engine_play_ring(dev);
439 out_err:
440 return ENGINE_STATE_IDLE;
443 static enum engine_state_num engine_do_speaking(int ssock, int *csock,
444 int usocki, int usocko,
445 struct alsa_dev *dev)
447 ssize_t ret;
448 int recv_started = 0, nfds = 0, tmp, i;
449 struct pollfd *pfds = NULL;
450 char msg[MAX_MSG];
451 uint32_t send_seq = 0;
452 CELTMode *mode;
453 CELTEncoder *encoder;
454 CELTDecoder *decoder;
455 JitterBuffer *jitter;
456 struct sockaddr raddr;
457 struct transsip_hdr *thdr;
458 socklen_t raddrlen;
459 struct cli_pkt cpkt;
461 assert(ecurr.active == 1);
463 mode = celt_mode_create(SAMPLING_RATE, FRAME_SIZE, NULL);
464 encoder = celt_encoder_create(mode, CHANNELS, NULL);
465 decoder = celt_decoder_create(mode, CHANNELS, NULL);
467 jitter = jitter_buffer_init(FRAME_SIZE);
468 tmp = FRAME_SIZE;
469 jitter_buffer_ctl(jitter, JITTER_BUFFER_SET_MARGIN, &tmp);
471 nfds = alsa_nfds(dev);
472 pfds = xmalloc(sizeof(*pfds) * (nfds + 2));
474 alsa_getfds(dev, pfds, nfds);
476 pfds[nfds].fd = ecurr.sock;
477 pfds[nfds].events = POLLIN;
478 pfds[nfds + 1].fd = usocki;
479 pfds[nfds + 1].events = POLLIN;
481 alsa_start(dev);
483 while (likely(!quit)) {
484 poll(pfds, nfds + 2, -1);
486 if (pfds[nfds + 1].revents & POLLIN) {
487 ret = read(usocki, &cpkt, sizeof(cpkt));
488 if (ret <= 0) {
489 whine("Read error from cli!\n");
490 continue;
492 if (cpkt.fin) {
493 whine("You aborted call!\n");
494 goto out_err;
498 if (pfds[nfds].revents & POLLIN) {
499 JitterBufferPacket packet;
501 memset(msg, 0, sizeof(msg));
502 raddrlen = sizeof(raddr);
503 ret = recvfrom(ecurr.sock, msg, sizeof(msg), 0,
504 &raddr, &raddrlen);
505 if (unlikely(ret <= 0))
506 continue;
508 if (raddrlen != ecurr.addrlen ||
509 memcmp(&raddr, &ecurr.addr, raddrlen)) {
510 memset(msg, 0, sizeof(msg));
512 thdr = (struct transsip_hdr *) msg;
513 thdr->bsy = 1;
515 sendto(ecurr.sock, msg, sizeof(*thdr), 0,
516 &raddr, raddrlen);
518 goto out_alsa;
521 thdr = (struct transsip_hdr *) msg;
522 if (thdr->fin == 1) {
523 whine("Remote end hung up!\n");
524 goto out_err;
527 packet.data = msg + sizeof(*thdr);
528 packet.len = ret - sizeof(*thdr);
529 packet.timestamp = ntohl(thdr->seq);
530 packet.span = FRAME_SIZE;
531 packet.sequence = 0;
533 jitter_buffer_put(jitter, &packet);
534 recv_started = 1;
536 out_alsa:
537 if (alsa_play_ready(dev, pfds, nfds)) {
538 short pcm[FRAME_SIZE * CHANNELS];
540 if (recv_started) {
541 JitterBufferPacket packet;
543 memset(msg, 0, sizeof(msg));
544 packet.data = msg;
545 packet.len = MAX_MSG;
547 jitter_buffer_tick(jitter);
548 jitter_buffer_get(jitter, &packet,
549 FRAME_SIZE, NULL);
550 if (packet.len == 0)
551 packet.data = NULL;
553 celt_decode(decoder, (const unsigned char *)
554 packet.data, packet.len, pcm);
555 } else {
556 for (i = 0; i < FRAME_SIZE * CHANNELS; ++i)
557 pcm[i] = 0;
560 alsa_write(dev, pcm, FRAME_SIZE);
563 if (alsa_cap_ready(dev, pfds, nfds)) {
564 short pcm[FRAME_SIZE * CHANNELS];
566 alsa_read(dev, pcm, FRAME_SIZE);
568 memset(msg, 0, sizeof(msg));
569 thdr = (struct transsip_hdr *) msg;
571 celt_encode(encoder, pcm, NULL, (unsigned char *)
572 (msg + sizeof(*thdr)), PACKETSIZE);
574 thdr->psh = 1;
575 thdr->est = 1;
576 thdr->seq = htonl(send_seq);
577 send_seq += FRAME_SIZE;
579 ret = sendto(ecurr.sock, msg,
580 PACKETSIZE + sizeof(*thdr), 0,
581 &ecurr.addr, ecurr.addrlen);
582 if (ret <= 0) {
583 whine("Send datagram failed!\n");
584 goto out_err;
589 out_err:
590 alsa_stop(dev);
592 memset(msg, 0, sizeof(msg));
593 thdr = (struct transsip_hdr *) msg;
594 thdr->fin = 1;
596 sendto(ecurr.sock, msg, sizeof(*thdr), 0, &ecurr.addr,
597 ecurr.addrlen);
599 if (ecurr.sock == *csock) {
600 close(*csock);
601 *csock = 0;
604 celt_encoder_destroy(encoder);
605 celt_decoder_destroy(decoder);
606 celt_mode_destroy(mode);
608 jitter_buffer_destroy(jitter);
610 xfree(pfds);
612 ecurr.active = 0;
613 return ENGINE_STATE_IDLE;
616 static inline void engine_drop_from_queue(int sock)
618 char msg[MAX_MSG];
619 recv(sock, msg, sizeof(msg), 0);
622 static enum engine_state_num engine_do_idle(int ssock, int *csock, int usocki,
623 int usocko, struct alsa_dev *dev)
625 int i;
626 ssize_t ret;
627 struct pollfd fds[2];
628 char msg[MAX_MSG];
629 struct transsip_hdr *thdr;
630 struct cli_pkt cpkt;
632 assert(ecurr.active == 0);
634 fds[0].fd = ssock;
635 fds[0].events = POLLIN;
636 fds[1].fd = usocki;
637 fds[1].events = POLLIN;
639 while (likely(!quit)) {
640 memset(msg, 0, sizeof(msg));
642 poll(fds, array_size(fds), -1);
643 for (i = 0; i < array_size(fds); ++i) {
644 if ((fds[i].revents & POLLIN) != POLLIN)
645 continue;
647 if (fds[i].fd == ssock) {
648 ret = recv(ssock, msg, sizeof(msg), MSG_PEEK);
649 if (ret < 0)
650 continue;
651 if (ret < sizeof(struct transsip_hdr))
652 engine_drop_from_queue(ssock);
654 thdr = (struct transsip_hdr *) msg;
655 if (thdr->est == 1)
656 return ENGINE_STATE_CALLIN;
657 else
658 engine_drop_from_queue(ssock);
661 if (fds[i].fd == usocki) {
662 ret = read(usocki, &cpkt, sizeof(cpkt));
663 if (ret <= 0) {
664 whine("Error reading from cli!\n");
665 continue;
667 if (cpkt.ring)
668 return ENGINE_STATE_CALLOUT;
673 return ENGINE_STATE_IDLE;
676 struct engine_state state_machine[__ENGINE_STATE_MAX] __read_mostly = {
677 STATE_MAP_SET(ENGINE_STATE_IDLE, engine_do_idle),
678 STATE_MAP_SET(ENGINE_STATE_CALLOUT, engine_do_callout),
679 STATE_MAP_SET(ENGINE_STATE_CALLIN, engine_do_callin),
680 STATE_MAP_SET(ENGINE_STATE_SPEAKING, engine_do_speaking),
683 void *engine_main(void *arg)
685 int ssock = -1, csock, ret, mtu, usocki, usocko;
686 enum engine_state_num state;
687 struct addrinfo hints, *ahead, *ai;
688 struct alsa_dev *dev = NULL;
689 struct pipepair *pp = arg;
691 init_call_notifier();
693 usocki = pp->i;
694 usocko = pp->o;
696 while (!stun_done)
697 sleep(0);
699 memset(&hints, 0, sizeof(hints));
700 hints.ai_family = PF_UNSPEC;
701 hints.ai_socktype = SOCK_DGRAM;
702 hints.ai_protocol = IPPROTO_UDP;
703 hints.ai_flags = AI_PASSIVE;
705 ret = getaddrinfo(NULL, port, &hints, &ahead);
706 if (ret < 0)
707 panic("Cannot get address info!\n");
709 for (ai = ahead; ai != NULL && ssock < 0; ai = ai->ai_next) {
710 ssock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
711 if (ssock < 0)
712 continue;
713 if (ai->ai_family == AF_INET6) {
714 int one = 1;
715 #ifdef IPV6_V6ONLY
716 ret = setsockopt(ssock, IPPROTO_IPV6, IPV6_V6ONLY,
717 &one, sizeof(one));
718 if (ret < 0) {
719 close(ssock);
720 ssock = -1;
721 continue;
723 #else
724 close(ssock);
725 ssock = -1;
726 continue;
727 #endif /* IPV6_V6ONLY */
730 mtu = IP_PMTUDISC_DONT;
731 setsockopt(ssock, SOL_IP, IP_MTU_DISCOVER, &mtu, sizeof(mtu));
733 ret = bind(ssock, ai->ai_addr, ai->ai_addrlen);
734 if (ret < 0) {
735 close(ssock);
736 ssock = -1;
737 continue;
741 freeaddrinfo(ahead);
742 if (ssock < 0)
743 panic("Cannot open socket!\n");
745 dev = alsa_open(alsadev, SAMPLING_RATE, CHANNELS, FRAME_SIZE);
746 if (!dev)
747 panic("Cannot open ALSA device %s!\n", alsadev);
749 state = ENGINE_STATE_IDLE;
750 while (likely(!quit)) {
751 int arg = state;
752 call_notifier_exec(CALL_STATE_MACHINE_CHANGED, &arg);
753 state = state_machine[state].process(ssock, &csock,
754 usocki, usocko,
755 dev);
758 alsa_close(dev);
759 close(ssock);
761 pthread_exit(0);