2 * transsip - the telephony network
3 * By Daniel Borkmann <daniel@transsip.org>
4 * Copyright 2011, 2012 Daniel Borkmann <dborkma@tik.ee.ethz.ch>,
5 * Swiss federal institute of technology (ETH Zurich)
6 * Subject to the GPL, version 2.
16 #include <celt/celt.h>
17 #include <speex/speex_jitter.h>
18 #include <speex/speex_echo.h>
19 #include <netinet/in.h>
20 #include <arpa/inet.h>
29 #include "call-notifier.h"
31 #define SAMPLING_RATE 48000
32 #define FRAME_SIZE 256
40 __extension__
uint8_t est
:1,
45 } __attribute__((packed
));
47 enum engine_state_num
{
48 ENGINE_STATE_IDLE
= CALL_STATE_MACHINE_IDLE
,
49 ENGINE_STATE_CALLOUT
= CALL_STATE_MACHINE_CALLOUT
,
50 ENGINE_STATE_CALLIN
= CALL_STATE_MACHINE_CALLIN
,
51 ENGINE_STATE_SPEAKING
= CALL_STATE_MACHINE_SPEAKING
,
55 enum engine_sound_type
{
56 ENGINE_SOUND_DIAL
= 0,
62 volatile enum engine_state_num state
;
63 enum engine_state_num (*process
)(int, int *, int, int,
67 #define STATE_MAP_SET(s, f) { \
72 extern sig_atomic_t quit
;
74 sig_atomic_t stun_done
= 0;
76 static char *alsadev
= "plughw:0,0"; //XXX
77 static char *port
= "30111"; //XXX
86 static struct engine_curr ecurr
;
88 static void engine_play_file(struct alsa_dev
*dev
, enum engine_sound_type type
)
92 short pcm
[FRAME_SIZE
* CHANNELS
];
93 struct pollfd
*pfds
= NULL
;
95 memset(path
, 0, sizeof(path
));
97 case ENGINE_SOUND_DIAL
:
98 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_DIAL
);
100 case ENGINE_SOUND_BUSY
:
101 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_BUSY
);
103 case ENGINE_SOUND_RING
:
104 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_RING
);
108 fd
= open(path
, O_RDONLY
);
110 panic("Cannot open ring file!\n");
114 nfds
= alsa_nfds(dev
);
115 pfds
= xmalloc(sizeof(*pfds
) * nfds
);
117 alsa_getfds(dev
, pfds
, nfds
);
119 memset(pcm
, 0, sizeof(pcm
));
120 while (read(fd
, pcm
, sizeof(pcm
)) > 0) {
121 poll(pfds
, nfds
, -1);
123 if (alsa_play_ready(dev
, pfds
, nfds
))
124 alsa_write(dev
, pcm
, FRAME_SIZE
);
125 memset(pcm
, 0, sizeof(pcm
));
127 if (alsa_cap_ready(dev
, pfds
, nfds
))
128 alsa_read(dev
, pcm
, FRAME_SIZE
);
129 memset(pcm
, 0, sizeof(pcm
));
137 static inline void engine_play_ring(struct alsa_dev
*dev
)
139 engine_play_file(dev
, ENGINE_SOUND_RING
);
142 static inline void engine_play_busy(struct alsa_dev
*dev
)
144 engine_play_file(dev
, ENGINE_SOUND_BUSY
);
147 static inline void engine_play_dial(struct alsa_dev
*dev
)
149 engine_play_file(dev
, ENGINE_SOUND_DIAL
);
152 static void engine_decode_packet(uint8_t *pkt
, size_t len
)
154 struct transsip_hdr
*hdr
;
156 if (len
< sizeof(*hdr
)) {
157 whine("[dbg] pkt too small!\n");
161 hdr
= (struct transsip_hdr
*) pkt
;
162 whine("[dbg] packet:\n");
163 whine("[dbg] seq: %d\n", hdr
->seq
);
164 whine("[dbg] est: %d\n", hdr
->est
);
165 whine("[dbg] psh: %d\n", hdr
->psh
);
166 whine("[dbg] bsy: %d\n", hdr
->bsy
);
167 whine("[dbg] fin: %d\n", hdr
->fin
);
168 whine("[dbg] res1: %d\n", hdr
->res1
);
171 static enum engine_state_num
engine_do_callout(int ssock
, int *csock
, int usocki
,
172 int usocko
, struct alsa_dev
*dev
)
174 int one
, mtu
, tries
, i
;
177 struct addrinfo hints
, *ahead
, *ai
;
179 struct pollfd fds
[2];
180 struct sockaddr raddr
;
182 struct transsip_hdr
*thdr
;
184 assert(ecurr
.active
== 0);
186 memset(&cpkt
, 0, sizeof(cpkt
));
187 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
188 if (ret
!= sizeof(cpkt
)) {
189 whine("Read error from cli!\n");
190 return ENGINE_STATE_IDLE
;
192 if (cpkt
.ring
== 0) {
193 whine("Read no ring flag from cli!\n");
194 return ENGINE_STATE_IDLE
;
197 memset(&hints
, 0, sizeof(hints
));
198 hints
.ai_family
= PF_UNSPEC
;
199 hints
.ai_socktype
= SOCK_DGRAM
;
200 hints
.ai_protocol
= IPPROTO_UDP
;
201 hints
.ai_flags
= AI_NUMERICSERV
;
203 ret
= getaddrinfo(cpkt
.address
, cpkt
.port
, &hints
, &ahead
);
205 whine("Cannot get address info for %s:%s!\n",
206 cpkt
.address
, cpkt
.port
);
207 return ENGINE_STATE_IDLE
;
212 for (ai
= ahead
; ai
!= NULL
&& *csock
< 0; ai
= ai
->ai_next
) {
213 *csock
= socket(ai
->ai_family
, ai
->ai_socktype
,
218 ret
= connect(*csock
, ai
->ai_addr
, ai
->ai_addrlen
);
220 whine("Cannot connect to remote!\n");
227 setsockopt(*csock
, SOL_SOCKET
, SO_KEEPALIVE
, &one
, sizeof(one
));
229 mtu
= IP_PMTUDISC_DONT
;
230 setsockopt(*csock
, SOL_IP
, IP_MTU_DISCOVER
, &mtu
, sizeof(mtu
));
232 memcpy(&ecurr
.addr
, ai
->ai_addr
, ai
->ai_addrlen
);
233 ecurr
.addrlen
= ai
->ai_addrlen
;
240 whine("Cannot connect to server!\n");
241 return ENGINE_STATE_IDLE
;
246 memset(msg
, 0, sizeof(msg
));
247 thdr
= (struct transsip_hdr
*) msg
;
250 ret
= sendto(*csock
, msg
, sizeof(*thdr
), 0, &ecurr
.addr
,
253 whine("Cannot send ring probe to server!\n");
258 fds
[0].events
= POLLIN
;
260 fds
[1].events
= POLLIN
;
262 while (!quit
&& tries
++ < 100) {
263 poll(fds
, array_size(fds
), 1500);
265 for (i
= 0; i
< array_size(fds
); ++i
) {
266 if ((fds
[i
].revents
& POLLERR
) == POLLERR
) {
267 printf("Destination unreachable?\n");
270 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
273 if (fds
[i
].fd
== usocki
) {
274 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
276 whine("Read error from cli!\n");
280 whine("User aborted call!\n");
285 if (fds
[i
].fd
== *csock
) {
286 memset(msg
, 0, sizeof(msg
));
287 raddrlen
= sizeof(raddr
);
288 ret
= recvfrom(*csock
, msg
, sizeof(msg
), 0,
293 engine_decode_packet((uint8_t *) msg
, ret
);
295 if (raddrlen
!= ecurr
.addrlen
)
297 if (memcmp(&raddr
, &ecurr
.addr
, raddrlen
))
300 thdr
= (struct transsip_hdr
*) msg
;
301 if (thdr
->est
== 1 && thdr
->psh
== 1) {
303 whine("Call established!\n");
304 return ENGINE_STATE_SPEAKING
;
306 if (thdr
->bsy
== 1) {
307 whine("Remote end busy!\n");
308 engine_play_busy(dev
);
309 engine_play_busy(dev
);
315 engine_play_dial(dev
);
321 return ENGINE_STATE_IDLE
;
324 static enum engine_state_num
engine_do_callin(int ssock
, int *csock
, int usocki
,
325 int usocko
, struct alsa_dev
*dev
)
330 struct sockaddr raddr
;
332 struct transsip_hdr
*thdr
;
333 char hbuff
[256], sbuff
[256];
334 struct pollfd fds
[2];
337 assert(ecurr
.active
== 0);
339 memset(&msg
, 0, sizeof(msg
));
340 raddrlen
= sizeof(raddr
);
341 ret
= recvfrom(ssock
, msg
, sizeof(msg
), 0, &raddr
, &raddrlen
);
343 whine("Receive error %s!\n", strerror(errno
));
344 return ENGINE_STATE_IDLE
;
347 thdr
= (struct transsip_hdr
*) msg
;
349 return ENGINE_STATE_IDLE
;
351 memcpy(&ecurr
.addr
, &raddr
, raddrlen
);
352 ecurr
.addrlen
= raddrlen
;
356 memset(hbuff
, 0, sizeof(hbuff
));
357 memset(sbuff
, 0, sizeof(sbuff
));
358 getnameinfo((struct sockaddr
*) &raddr
, raddrlen
, hbuff
, sizeof(hbuff
),
359 sbuff
, sizeof(sbuff
), NI_NUMERICHOST
| NI_NUMERICSERV
);
361 printf("New incoming connection from %s:%s!\n", hbuff
, sbuff
);
362 printf("Answer it with: take\n");
363 printf("Reject it with: hangup\n");
367 fds
[0].events
= POLLIN
;
369 fds
[1].events
= POLLIN
;
371 while (likely(!quit
)) {
372 poll(fds
, array_size(fds
), 1500);
374 for (i
= 0; i
< array_size(fds
); ++i
) {
375 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
378 if (fds
[i
].fd
== ssock
) {
379 memset(msg
, 0, sizeof(msg
));
380 raddrlen
= sizeof(raddr
);
381 ret
= recvfrom(ssock
, msg
, sizeof(msg
), 0,
385 if (raddrlen
!= ecurr
.addrlen
)
387 if (memcmp(&raddr
, &ecurr
.addr
, raddrlen
))
390 if (thdr
->fin
== 1) {
391 whine("Remote end hung up!\n");
392 engine_play_busy(dev
);
393 engine_play_busy(dev
);
399 if (fds
[i
].fd
== usocki
) {
400 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
402 whine("Error reading from cli!\n");
406 memset(&msg
, 0, sizeof(msg
));
407 thdr
= (struct transsip_hdr
*) msg
;
410 sendto(ssock
, msg
, sizeof(*thdr
), 0,
411 &ecurr
.addr
, ecurr
.addrlen
);
413 whine("You aborted call!\n");
417 memset(&msg
, 0, sizeof(msg
));
418 thdr
= (struct transsip_hdr
*) msg
;
422 ret
= sendto(ssock
, msg
, sizeof(*thdr
), 0,
423 &ecurr
.addr
, ecurr
.addrlen
);
425 whine("Error sending ack!\n");
430 whine("Call established!\n");
431 return ENGINE_STATE_SPEAKING
;
436 engine_play_ring(dev
);
440 return ENGINE_STATE_IDLE
;
443 static enum engine_state_num
engine_do_speaking(int ssock
, int *csock
,
444 int usocki
, int usocko
,
445 struct alsa_dev
*dev
)
448 int recv_started
= 0, nfds
= 0, tmp
, i
;
449 struct pollfd
*pfds
= NULL
;
451 uint32_t send_seq
= 0;
453 CELTEncoder
*encoder
;
454 CELTDecoder
*decoder
;
455 JitterBuffer
*jitter
;
456 struct sockaddr raddr
;
457 struct transsip_hdr
*thdr
;
461 assert(ecurr
.active
== 1);
463 mode
= celt_mode_create(SAMPLING_RATE
, FRAME_SIZE
, NULL
);
464 encoder
= celt_encoder_create(mode
, CHANNELS
, NULL
);
465 decoder
= celt_decoder_create(mode
, CHANNELS
, NULL
);
467 jitter
= jitter_buffer_init(FRAME_SIZE
);
469 jitter_buffer_ctl(jitter
, JITTER_BUFFER_SET_MARGIN
, &tmp
);
471 nfds
= alsa_nfds(dev
);
472 pfds
= xmalloc(sizeof(*pfds
) * (nfds
+ 2));
474 alsa_getfds(dev
, pfds
, nfds
);
476 pfds
[nfds
].fd
= ecurr
.sock
;
477 pfds
[nfds
].events
= POLLIN
;
478 pfds
[nfds
+ 1].fd
= usocki
;
479 pfds
[nfds
+ 1].events
= POLLIN
;
483 while (likely(!quit
)) {
484 poll(pfds
, nfds
+ 2, -1);
486 if (pfds
[nfds
+ 1].revents
& POLLIN
) {
487 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
489 whine("Read error from cli!\n");
493 whine("You aborted call!\n");
498 if (pfds
[nfds
].revents
& POLLIN
) {
499 JitterBufferPacket packet
;
501 memset(msg
, 0, sizeof(msg
));
502 raddrlen
= sizeof(raddr
);
503 ret
= recvfrom(ecurr
.sock
, msg
, sizeof(msg
), 0,
505 if (unlikely(ret
<= 0))
508 if (raddrlen
!= ecurr
.addrlen
||
509 memcmp(&raddr
, &ecurr
.addr
, raddrlen
)) {
510 memset(msg
, 0, sizeof(msg
));
512 thdr
= (struct transsip_hdr
*) msg
;
515 sendto(ecurr
.sock
, msg
, sizeof(*thdr
), 0,
521 thdr
= (struct transsip_hdr
*) msg
;
522 if (thdr
->fin
== 1) {
523 whine("Remote end hung up!\n");
527 packet
.data
= msg
+ sizeof(*thdr
);
528 packet
.len
= ret
- sizeof(*thdr
);
529 packet
.timestamp
= ntohl(thdr
->seq
);
530 packet
.span
= FRAME_SIZE
;
533 jitter_buffer_put(jitter
, &packet
);
537 if (alsa_play_ready(dev
, pfds
, nfds
)) {
538 short pcm
[FRAME_SIZE
* CHANNELS
];
541 JitterBufferPacket packet
;
543 memset(msg
, 0, sizeof(msg
));
545 packet
.len
= MAX_MSG
;
547 jitter_buffer_tick(jitter
);
548 jitter_buffer_get(jitter
, &packet
,
553 celt_decode(decoder
, (const unsigned char *)
554 packet
.data
, packet
.len
, pcm
);
556 for (i
= 0; i
< FRAME_SIZE
* CHANNELS
; ++i
)
560 alsa_write(dev
, pcm
, FRAME_SIZE
);
563 if (alsa_cap_ready(dev
, pfds
, nfds
)) {
564 short pcm
[FRAME_SIZE
* CHANNELS
];
566 alsa_read(dev
, pcm
, FRAME_SIZE
);
568 memset(msg
, 0, sizeof(msg
));
569 thdr
= (struct transsip_hdr
*) msg
;
571 celt_encode(encoder
, pcm
, NULL
, (unsigned char *)
572 (msg
+ sizeof(*thdr
)), PACKETSIZE
);
576 thdr
->seq
= htonl(send_seq
);
577 send_seq
+= FRAME_SIZE
;
579 ret
= sendto(ecurr
.sock
, msg
,
580 PACKETSIZE
+ sizeof(*thdr
), 0,
581 &ecurr
.addr
, ecurr
.addrlen
);
583 whine("Send datagram failed!\n");
592 memset(msg
, 0, sizeof(msg
));
593 thdr
= (struct transsip_hdr
*) msg
;
596 sendto(ecurr
.sock
, msg
, sizeof(*thdr
), 0, &ecurr
.addr
,
599 if (ecurr
.sock
== *csock
) {
604 celt_encoder_destroy(encoder
);
605 celt_decoder_destroy(decoder
);
606 celt_mode_destroy(mode
);
608 jitter_buffer_destroy(jitter
);
613 return ENGINE_STATE_IDLE
;
616 static inline void engine_drop_from_queue(int sock
)
619 recv(sock
, msg
, sizeof(msg
), 0);
622 static enum engine_state_num
engine_do_idle(int ssock
, int *csock
, int usocki
,
623 int usocko
, struct alsa_dev
*dev
)
627 struct pollfd fds
[2];
629 struct transsip_hdr
*thdr
;
632 assert(ecurr
.active
== 0);
635 fds
[0].events
= POLLIN
;
637 fds
[1].events
= POLLIN
;
639 while (likely(!quit
)) {
640 memset(msg
, 0, sizeof(msg
));
642 poll(fds
, array_size(fds
), -1);
643 for (i
= 0; i
< array_size(fds
); ++i
) {
644 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
647 if (fds
[i
].fd
== ssock
) {
648 ret
= recv(ssock
, msg
, sizeof(msg
), MSG_PEEK
);
651 if (ret
< sizeof(struct transsip_hdr
))
652 engine_drop_from_queue(ssock
);
654 thdr
= (struct transsip_hdr
*) msg
;
656 return ENGINE_STATE_CALLIN
;
658 engine_drop_from_queue(ssock
);
661 if (fds
[i
].fd
== usocki
) {
662 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
664 whine("Error reading from cli!\n");
668 return ENGINE_STATE_CALLOUT
;
673 return ENGINE_STATE_IDLE
;
676 struct engine_state state_machine
[__ENGINE_STATE_MAX
] __read_mostly
= {
677 STATE_MAP_SET(ENGINE_STATE_IDLE
, engine_do_idle
),
678 STATE_MAP_SET(ENGINE_STATE_CALLOUT
, engine_do_callout
),
679 STATE_MAP_SET(ENGINE_STATE_CALLIN
, engine_do_callin
),
680 STATE_MAP_SET(ENGINE_STATE_SPEAKING
, engine_do_speaking
),
683 void *engine_main(void *arg
)
685 int ssock
= -1, csock
, ret
, mtu
, usocki
, usocko
;
686 enum engine_state_num state
;
687 struct addrinfo hints
, *ahead
, *ai
;
688 struct alsa_dev
*dev
= NULL
;
689 struct pipepair
*pp
= arg
;
691 init_call_notifier();
699 memset(&hints
, 0, sizeof(hints
));
700 hints
.ai_family
= PF_UNSPEC
;
701 hints
.ai_socktype
= SOCK_DGRAM
;
702 hints
.ai_protocol
= IPPROTO_UDP
;
703 hints
.ai_flags
= AI_PASSIVE
;
705 ret
= getaddrinfo(NULL
, port
, &hints
, &ahead
);
707 panic("Cannot get address info!\n");
709 for (ai
= ahead
; ai
!= NULL
&& ssock
< 0; ai
= ai
->ai_next
) {
710 ssock
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
);
713 if (ai
->ai_family
== AF_INET6
) {
716 ret
= setsockopt(ssock
, IPPROTO_IPV6
, IPV6_V6ONLY
,
727 #endif /* IPV6_V6ONLY */
730 mtu
= IP_PMTUDISC_DONT
;
731 setsockopt(ssock
, SOL_IP
, IP_MTU_DISCOVER
, &mtu
, sizeof(mtu
));
733 ret
= bind(ssock
, ai
->ai_addr
, ai
->ai_addrlen
);
743 panic("Cannot open socket!\n");
745 dev
= alsa_open(alsadev
, SAMPLING_RATE
, CHANNELS
, FRAME_SIZE
);
747 panic("Cannot open ALSA device %s!\n", alsadev
);
749 state
= ENGINE_STATE_IDLE
;
750 while (likely(!quit
)) {
752 call_notifier_exec(CALL_STATE_MACHINE_CHANGED
, &arg
);
753 state
= state_machine
[state
].process(ssock
, &csock
,