Update
[transsip-mirror.git] / src / engine.c
blob078a55401017b64a8770753a459d0e141a25a880
1 #include <sys/types.h>
2 #include <sys/stat.h>
3 #include <fcntl.h>
4 #include <pthread.h>
5 #include <string.h>
6 #include <signal.h>
7 #include <unistd.h>
8 #include <celt/celt.h>
9 #include <speex/speex_jitter.h>
10 #include <speex/speex_echo.h>
11 #include <netinet/in.h>
12 #include <arpa/inet.h>
13 #include <netdb.h>
14 #include <assert.h>
16 #include "built-in.h"
17 #include "alsa.h"
18 #include "die.h"
19 #include "xmalloc.h"
20 #include "xutils.h"
22 #define SAMPLING_RATE 48000
23 #define FRAME_SIZE 256
24 #define PACKETSIZE 43
25 #define CHANNELS 1
26 #define MAX_MSG 1500
27 #define PATH_MAX 512
29 struct transsip_hdr {
30 uint32_t seq;
31 __extension__ uint8_t est:1,
32 psh:1,
33 bsy:1,
34 fin:1,
35 res1:4;
36 } __attribute__((packed));
38 enum engine_state_num {
39 ENGINE_STATE_IDLE = 0,
40 ENGINE_STATE_CALLOUT,
41 ENGINE_STATE_CALLIN,
42 ENGINE_STATE_SPEAKING,
43 __ENGINE_STATE_MAX,
46 enum engine_sound_type {
47 ENGINE_SOUND_DIAL = 0,
48 ENGINE_SOUND_RING,
49 ENGINE_SOUND_BUSY,
52 struct engine_state {
53 volatile enum engine_state_num state;
54 enum engine_state_num (*process)(int, int *, int, int,
55 struct alsa_dev *);
58 #define STATE_MAP_SET(s, f) { \
59 .state = (s), \
60 .process = (f) \
63 extern sig_atomic_t quit;
65 static char *alsadev = "plughw:0,0"; //XXX
66 static char *port = "30111"; //XXX
68 struct engine_curr {
69 int active;
70 int sock;
71 struct sockaddr addr;
72 socklen_t addrlen;
75 static struct engine_curr ecurr;
77 static void engine_play_file(struct alsa_dev *dev, enum engine_sound_type type)
79 int fd;
80 char path[PATH_MAX];
81 short pcm[FRAME_SIZE * CHANNELS];
83 memset(path, 0, sizeof(path));
84 switch (type) {
85 case ENGINE_SOUND_DIAL:
86 slprintf(path, sizeof(path), "%s/%s", FILE_ETCDIR, FILE_DIAL);
87 break;
88 case ENGINE_SOUND_BUSY:
89 slprintf(path, sizeof(path), "%s/%s", FILE_ETCDIR, FILE_BUSY);
90 break;
91 case ENGINE_SOUND_RING:
92 slprintf(path, sizeof(path), "%s/%s", FILE_ETCDIR, FILE_RING);
93 break;
96 fd = open(path, O_RDONLY);
97 if (fd < 0)
98 panic("Cannot open ring file!\n");
100 memset(pcm, 0, sizeof(pcm));
102 alsa_start(dev);
104 while (read(fd, pcm, sizeof(pcm)) > 0) {
105 alsa_write(dev, pcm, FRAME_SIZE);
106 alsa_read(dev, pcm, FRAME_SIZE);
107 memset(pcm, 0, sizeof(pcm));
110 alsa_stop(dev);
112 close(fd);
115 static inline void engine_play_ring(struct alsa_dev *dev)
117 engine_play_file(dev, ENGINE_SOUND_RING);
120 static inline void engine_play_busy(struct alsa_dev *dev)
122 engine_play_file(dev, ENGINE_SOUND_BUSY);
125 static inline void engine_play_dial(struct alsa_dev *dev)
127 engine_play_file(dev, ENGINE_SOUND_DIAL);
130 static enum engine_state_num engine_do_callout(int ssock, int *csock, int usocki,
131 int usocko, struct alsa_dev *dev)
133 int one, mtu, tries, i;
134 ssize_t ret;
135 struct cli_pkt cpkt;
136 struct addrinfo hints, *ahead, *ai;
137 char msg[MAX_MSG];
138 struct pollfd fds[3];
139 struct sockaddr raddr;
140 socklen_t raddrlen;
141 struct transsip_hdr *thdr;
143 assert(ecurr.active == 0);
145 whine("In Callout!\n");
147 memset(&cpkt, 0, sizeof(cpkt));
148 ret = read(usocki, &cpkt, sizeof(cpkt));
149 if (ret != sizeof(cpkt)) {
150 whine("Error receiving packet from cmdline!\n");
151 return ENGINE_STATE_IDLE;
153 if (cpkt.ring == 0)
154 return ENGINE_STATE_IDLE;
156 memset(&hints, 0, sizeof(hints));
157 hints.ai_family = PF_UNSPEC;
158 hints.ai_socktype = SOCK_DGRAM;
159 hints.ai_protocol = IPPROTO_UDP;
160 hints.ai_flags = AI_NUMERICSERV;
162 ret = getaddrinfo(cpkt.address, cpkt.port, &hints, &ahead);
163 if (ret < 0) {
164 whine("Cannot get address info for %s:%s!\n",
165 cpkt.address, cpkt.port);
166 return ENGINE_STATE_IDLE;
169 *csock = -1;
171 for (ai = ahead; ai != NULL && *csock < 0; ai = ai->ai_next) {
172 *csock = socket(ai->ai_family, ai->ai_socktype,
173 ai->ai_protocol);
174 if (*csock < 0)
175 continue;
177 ret = connect(*csock, ai->ai_addr, ai->ai_addrlen);
178 if (ret < 0) {
179 whine("Cannot connect to remote!\n");
180 close(*csock);
181 *csock = -1;
182 continue;
185 one = 1;
186 setsockopt(*csock, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one));
188 mtu = IP_PMTUDISC_DONT;
189 setsockopt(*csock, SOL_IP, IP_MTU_DISCOVER, &mtu, sizeof(mtu));
191 memcpy(&ecurr.addr, ai->ai_addr, ai->ai_addrlen);
192 ecurr.addrlen = ai->ai_addrlen;
193 ecurr.sock = *csock;
194 ecurr.active = 0;
196 freeaddrinfo(ahead);
198 if (*csock < 0) {
199 whine("Cannot connect to server!\n");
200 return ENGINE_STATE_IDLE;
203 tries = 0;
205 memset(msg, 0, sizeof(msg));
206 thdr = (struct transsip_hdr *) msg;
207 thdr->est = 1;
209 ret = sendto(*csock, msg, sizeof(*thdr), 0, &ecurr.addr,
210 ecurr.addrlen);
211 if (ret <= 0) {
212 whine("Cannot send ring probe to server!\n");
213 goto out_err;
216 fds[0].fd = *csock;
217 fds[0].events = POLLIN;
218 fds[1].fd = ssock;
219 fds[1].events = POLLIN;
220 fds[2].fd = usocki;
221 fds[2].events = POLLIN;
223 while (!quit && tries++ < 100) {
224 poll(fds, array_size(fds), 1500);
226 for (i = 0; i < array_size(fds); ++i) {
227 if ((fds[i].revents & POLLIN) != POLLIN)
228 continue;
230 if (fds[i].fd == usocki) {
231 ret = read(usocki, &cpkt, sizeof(cpkt));
232 if (ret <= 0)
233 continue;
234 if (cpkt.fin) {
235 whine("User aborted call!\n");
236 goto out_err;
240 if (fds[i].fd == ssock) {
241 memset(msg, 0, sizeof(msg));
242 ret = recvfrom(ssock, msg, sizeof(msg), 0,
243 &raddr, &raddrlen);
244 if (ret <= 0)
245 continue;
247 memset(msg, 0, sizeof(msg));
248 thdr = (struct transsip_hdr *) msg;
249 thdr->bsy = 1;
251 sendto(ssock, msg, sizeof(*thdr), 0, &raddr,
252 raddrlen);
255 if (fds[i].fd == *csock) {
256 memset(msg, 0, sizeof(msg));
257 ret = recvfrom(*csock, msg, sizeof(msg), 0,
258 &raddr, &raddrlen);
259 if (ret <= 0)
260 continue;
261 if (raddrlen != ecurr.addrlen)
262 continue;
263 if (memcmp(&raddr, &ecurr.addr, raddrlen))
264 continue;
266 thdr = (struct transsip_hdr *) msg;
267 if (thdr->est == 1 && thdr->psh == 1) {
268 ecurr.active = 1;
269 whine("Call established!\n");
270 return ENGINE_STATE_SPEAKING;
272 if (thdr->bsy == 1) {
273 whine("Remote end busy!\n");
274 engine_play_busy(dev);
275 engine_play_busy(dev);
276 goto out_err;
281 engine_play_dial(dev);
284 out_err:
285 close(*csock);
286 *csock = 0;
287 return ENGINE_STATE_IDLE;
290 static enum engine_state_num engine_do_callin(int ssock, int *csock, int usocki,
291 int usocko, struct alsa_dev *dev)
293 int i;
294 ssize_t ret;
295 char msg[MAX_MSG];
296 struct sockaddr raddr;
297 socklen_t raddrlen;
298 struct transsip_hdr *thdr;
299 char hbuff[256], sbuff[256];
300 struct pollfd fds[2];
301 struct cli_pkt cpkt;
303 assert(ecurr.active == 0);
305 whine("In Callin!\n");
307 memset(&msg, 0, sizeof(msg));
308 ret = recvfrom(ssock, msg, sizeof(msg), 0, &raddr, &raddrlen);
309 if (ret <= 0) {
310 whine("error 1\n");
311 return ENGINE_STATE_IDLE;
314 thdr = (struct transsip_hdr *) msg;
315 if (thdr->est != 1) {
316 whine("error 2\n");
317 return ENGINE_STATE_IDLE;
320 memcpy(&ecurr.addr, &raddr, raddrlen);
321 ecurr.addrlen = raddrlen;
322 ecurr.sock = ssock;
323 ecurr.active = 0;
325 memset(hbuff, 0, sizeof(hbuff));
326 memset(sbuff, 0, sizeof(sbuff));
327 getnameinfo((struct sockaddr *) &raddr, raddrlen, hbuff, sizeof(hbuff),
328 sbuff, sizeof(sbuff), NI_NUMERICHOST | NI_NUMERICSERV);
330 printf("New incoming connection from %s:%s!\n", hbuff, sbuff);
331 printf("Answer it with: take\n");
332 printf("Reject it with: hangup\n");
333 fflush(stdout);
335 fds[0].fd = ssock;
336 fds[0].events = POLLIN;
337 fds[1].fd = usocki;
338 fds[1].events = POLLIN;
340 while (likely(!quit)) {
341 poll(fds, array_size(fds), 1500);
343 for (i = 0; i < array_size(fds); ++i) {
344 if ((fds[i].revents & POLLIN) != POLLIN)
345 continue;
347 if (fds[i].fd == ssock) {
348 memset(msg, 0, sizeof(msg));
349 ret = recvfrom(ssock, msg, sizeof(msg), 0,
350 &raddr, &raddrlen);
351 if (ret <= 0)
352 continue;
353 if (raddrlen != ecurr.addrlen)
354 continue;
355 if (memcmp(&raddr, &ecurr.addr, raddrlen))
356 continue;
358 if (thdr->fin == 1) {
359 whine("Remote end hung up!\n");
360 engine_play_busy(dev);
361 engine_play_busy(dev);
362 goto out_err;
367 if (fds[i].fd == usocki) {
368 ret = read(usocki, &cpkt, sizeof(cpkt));
369 if (ret <= 0)
370 continue;
371 if (cpkt.fin) {
372 memset(&msg, 0, sizeof(msg));
373 thdr = (struct transsip_hdr *) msg;
374 thdr->bsy = 1;
376 sendto(ssock, msg, sizeof(*thdr), 0,
377 &ecurr.addr, ecurr.addrlen);
379 whine("You aborted call!\n");
380 goto out_err;
382 if (cpkt.take) {
383 memset(&msg, 0, sizeof(msg));
384 thdr = (struct transsip_hdr *) msg;
385 thdr->est = 1;
386 thdr->psh = 1;
388 ret = sendto(ssock, msg, sizeof(*thdr), 0,
389 &ecurr.addr, ecurr.addrlen);
390 if (ret <= 0) {
391 whine("Error sending ack!\n");
392 goto out_err;
395 ecurr.active = 1;
396 whine("Call established!\n");
397 return ENGINE_STATE_SPEAKING;
402 engine_play_ring(dev);
405 out_err:
406 return ENGINE_STATE_IDLE;
409 static enum engine_state_num engine_do_speaking(int ssock, int *csock,
410 int usocki, int usocko,
411 struct alsa_dev *dev)
413 ssize_t ret;
414 int recv_started = 0, nfds = 0, tmp, i;
415 struct pollfd *pfds = NULL;
416 char msg[MAX_MSG];
417 uint32_t send_seq = 0;
418 CELTMode *mode;
419 CELTEncoder *encoder;
420 CELTDecoder *decoder;
421 JitterBuffer *jitter;
422 SpeexEchoState *echostate;
423 struct sockaddr raddr;
424 struct transsip_hdr *thdr;
425 socklen_t raddrlen;
426 struct cli_pkt cpkt;
428 assert(ecurr.active == 1);
430 whine("In Speaking!\n");
432 mode = celt_mode_create(SAMPLING_RATE, FRAME_SIZE, NULL);
433 encoder = celt_encoder_create(mode, CHANNELS, NULL);
434 decoder = celt_decoder_create(mode, CHANNELS, NULL);
436 jitter = jitter_buffer_init(FRAME_SIZE);
437 tmp = FRAME_SIZE;
438 jitter_buffer_ctl(jitter, JITTER_BUFFER_SET_MARGIN, &tmp);
440 echostate = speex_echo_state_init(FRAME_SIZE, 10 * FRAME_SIZE);
441 tmp = SAMPLING_RATE;
442 speex_echo_ctl(echostate, SPEEX_ECHO_SET_SAMPLING_RATE, &tmp);
444 nfds = alsa_nfds(dev);
445 pfds = xmalloc(sizeof(*pfds) * (nfds + 2));
447 alsa_getfds(dev, pfds, nfds);
449 pfds[nfds].fd = ecurr.sock;
450 pfds[nfds].events = POLLIN;
451 pfds[nfds + 1].fd = usocki;
452 pfds[nfds + 1].events = POLLIN;
454 alsa_start(dev);
456 while (likely(!quit)) {
457 poll(pfds, nfds + 2, -1);
459 if (pfds[nfds + 1].revents & POLLIN) {
460 ret = read(usocki, &cpkt, sizeof(cpkt));
461 if (ret <= 0)
462 continue;
463 if (cpkt.fin) {
464 whine("You aborted call!\n");
465 goto out_err;
469 if (pfds[nfds].revents & POLLIN) {
470 JitterBufferPacket packet;
472 memset(msg, 0, sizeof(msg));
473 ret = recvfrom(ecurr.sock, msg, sizeof(msg), 0,
474 &raddr, &raddrlen);
475 if (unlikely(ret <= 0))
476 goto out_err;
477 if (raddrlen != ecurr.addrlen ||
478 memcmp(&raddr, &ecurr.addr, raddrlen)) {
479 memset(msg, 0, sizeof(msg));
481 thdr = (struct transsip_hdr *) msg;
482 thdr->bsy = 1;
484 sendto(ecurr.sock, msg, sizeof(*thdr), 0,
485 &raddr, raddrlen);
487 goto out_alsa;
490 thdr = (struct transsip_hdr *) msg;
491 if (thdr->fin == 1 || thdr->psh == 0)
492 goto out_err;
494 packet.data = msg + sizeof(*thdr);
495 packet.len = ret - sizeof(*thdr);
496 packet.timestamp = ntohl(thdr->seq);
497 packet.span = FRAME_SIZE;
498 packet.sequence = 0;
500 jitter_buffer_put(jitter, &packet);
501 recv_started = 1;
503 out_alsa:
504 if (alsa_play_ready(dev, pfds, nfds)) {
505 short pcm[FRAME_SIZE * CHANNELS];
507 if (recv_started) {
508 JitterBufferPacket packet;
510 memset(msg, 0, sizeof(msg));
511 packet.data = msg;
512 packet.len = MAX_MSG;
514 jitter_buffer_tick(jitter);
515 jitter_buffer_get(jitter, &packet,
516 FRAME_SIZE, NULL);
517 if (packet.len == 0)
518 packet.data = NULL;
520 celt_decode(decoder, (const unsigned char *)
521 packet.data, packet.len, pcm);
522 } else {
523 for (i = 0; i < FRAME_SIZE * CHANNELS; ++i)
524 pcm[i] = 0;
527 alsa_write(dev, pcm, FRAME_SIZE);
528 speex_echo_state_reset(echostate);
529 speex_echo_playback(echostate, pcm);
532 if (alsa_cap_ready(dev, pfds, nfds)) {
533 short pcm[FRAME_SIZE * CHANNELS];
534 short pcm2[FRAME_SIZE * CHANNELS];
536 alsa_read(dev, pcm, FRAME_SIZE);
538 speex_echo_capture(echostate, pcm, pcm2);
539 for (i = 0; i < FRAME_SIZE * CHANNELS; ++i)
540 pcm[i] = pcm2[i];
542 memset(msg, 0, sizeof(msg));
543 thdr = (struct transsip_hdr *) msg;
545 celt_encode(encoder, pcm, NULL, (unsigned char *)
546 (msg + sizeof(*thdr)), PACKETSIZE);
548 thdr->psh = 1;
549 thdr->est = 1;
550 thdr->seq = htonl(send_seq);
551 send_seq += FRAME_SIZE;
553 ret = sendto(ecurr.sock, msg,
554 PACKETSIZE + sizeof(*thdr), 0,
555 &ecurr.addr, ecurr.addrlen);
556 if (ret <= 0) {
557 whine("Send datagram failed!\n");
558 goto out_err;
563 out_err:
564 alsa_stop(dev);
566 memset(msg, 0, sizeof(msg));
567 thdr = (struct transsip_hdr *) msg;
568 thdr->fin = 1;
570 sendto(ecurr.sock, msg, sizeof(*thdr), 0, &ecurr.addr,
571 ecurr.addrlen);
573 if (ecurr.sock == *csock) {
574 close(*csock);
575 *csock = 0;
578 celt_encoder_destroy(encoder);
579 celt_decoder_destroy(decoder);
580 celt_mode_destroy(mode);
582 jitter_buffer_destroy(jitter);
583 speex_echo_state_destroy(echostate);
585 xfree(pfds);
587 ecurr.active = 0;
588 return ENGINE_STATE_IDLE;
591 static enum engine_state_num engine_do_idle(int ssock, int *csock, int usocki,
592 int usocko, struct alsa_dev *dev)
594 int i;
595 ssize_t ret;
596 struct pollfd fds[2];
597 char msg[MAX_MSG];
598 struct transsip_hdr *thdr;
599 struct cli_pkt cpkt;
601 assert(ecurr.active == 0);
603 whine("In Idle!\n");
605 fds[0].fd = ssock;
606 fds[0].events = POLLIN;
607 fds[1].fd = usocki;
608 fds[1].events = POLLIN;
610 while (likely(!quit)) {
611 memset(msg, 0, sizeof(msg));
613 poll(fds, array_size(fds), -1);
614 for (i = 0; i < array_size(fds); ++i) {
615 if ((fds[i].revents & POLLIN) != POLLIN)
616 continue;
618 if (fds[i].fd == ssock) {
619 ret = recv(ssock, msg, sizeof(msg), MSG_PEEK);
620 if (ret <= 0)
621 continue;
622 thdr = (struct transsip_hdr *) msg;
623 if (thdr->est == 1)
624 return ENGINE_STATE_CALLIN;
627 if (fds[i].fd == usocki) {
628 ret = read(usocki, &cpkt, sizeof(cpkt));
629 if (ret <= 0)
630 continue;
631 if (cpkt.ring)
632 return ENGINE_STATE_CALLOUT;
637 return ENGINE_STATE_IDLE;
640 struct engine_state state_machine[__ENGINE_STATE_MAX] __read_mostly = {
641 STATE_MAP_SET(ENGINE_STATE_IDLE, engine_do_idle),
642 STATE_MAP_SET(ENGINE_STATE_CALLOUT, engine_do_callout),
643 STATE_MAP_SET(ENGINE_STATE_CALLIN, engine_do_callin),
644 STATE_MAP_SET(ENGINE_STATE_SPEAKING, engine_do_speaking),
647 void *engine_main(void *arg)
649 int ssock = -1, csock, ret, mtu, usocki, usocko;
650 enum engine_state_num state;
651 struct addrinfo hints, *ahead, *ai;
652 struct alsa_dev *dev = NULL;
653 struct pipepair *pp = arg;
655 usocki = pp->i;
656 usocko = pp->o;
658 memset(&hints, 0, sizeof(hints));
659 hints.ai_family = PF_UNSPEC;
660 hints.ai_socktype = SOCK_DGRAM;
661 hints.ai_protocol = IPPROTO_UDP;
662 hints.ai_flags = AI_PASSIVE;
664 ret = getaddrinfo(NULL, port, &hints, &ahead);
665 if (ret < 0)
666 panic("Cannot get address info!\n");
668 for (ai = ahead; ai != NULL && ssock < 0; ai = ai->ai_next) {
669 ssock = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
670 if (ssock < 0)
671 continue;
672 if (ai->ai_family == AF_INET6) {
673 int one = 1;
674 #ifdef IPV6_V6ONLY
675 ret = setsockopt(ssock, IPPROTO_IPV6, IPV6_V6ONLY,
676 &one, sizeof(one));
677 if (ret < 0) {
678 close(ssock);
679 ssock = -1;
680 continue;
682 #else
683 close(ssock);
684 ssock = -1;
685 continue;
686 #endif /* IPV6_V6ONLY */
689 mtu = IP_PMTUDISC_DONT;
690 setsockopt(ssock, SOL_IP, IP_MTU_DISCOVER, &mtu, sizeof(mtu));
692 ret = bind(ssock, ai->ai_addr, ai->ai_addrlen);
693 if (ret < 0) {
694 close(ssock);
695 ssock = -1;
696 continue;
700 freeaddrinfo(ahead);
701 if (ssock < 0)
702 panic("Cannot open socket!\n");
704 dev = alsa_open(alsadev, SAMPLING_RATE, CHANNELS, FRAME_SIZE);
705 if (!dev)
706 panic("Cannot open ALSA device %s!\n", alsadev);
708 state = ENGINE_STATE_IDLE;
709 while (likely(!quit)) {
710 state = state_machine[state].process(ssock, &csock,
711 usocki, usocko,
712 dev);
715 alsa_close(dev);
716 close(ssock);
718 pthread_exit(0);