9 #include <speex/speex_jitter.h>
10 #include <speex/speex_echo.h>
11 #include <netinet/in.h>
12 #include <arpa/inet.h>
22 #define SAMPLING_RATE 48000
23 #define FRAME_SIZE 256
31 __extension__
uint8_t est
:1,
36 } __attribute__((packed
));
38 enum engine_state_num
{
39 ENGINE_STATE_IDLE
= 0,
42 ENGINE_STATE_SPEAKING
,
46 enum engine_sound_type
{
47 ENGINE_SOUND_DIAL
= 0,
53 volatile enum engine_state_num state
;
54 enum engine_state_num (*process
)(int, int *, int, int,
58 #define STATE_MAP_SET(s, f) { \
63 extern sig_atomic_t quit
;
65 static char *alsadev
= "plughw:0,0"; //XXX
66 static char *port
= "30111"; //XXX
75 static struct engine_curr ecurr
;
77 static void engine_play_file(struct alsa_dev
*dev
, enum engine_sound_type type
)
81 short pcm
[FRAME_SIZE
* CHANNELS
];
83 memset(path
, 0, sizeof(path
));
85 case ENGINE_SOUND_DIAL
:
86 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_DIAL
);
88 case ENGINE_SOUND_BUSY
:
89 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_BUSY
);
91 case ENGINE_SOUND_RING
:
92 slprintf(path
, sizeof(path
), "%s/%s", FILE_ETCDIR
, FILE_RING
);
96 fd
= open(path
, O_RDONLY
);
98 panic("Cannot open ring file!\n");
100 memset(pcm
, 0, sizeof(pcm
));
104 while (read(fd
, pcm
, sizeof(pcm
)) > 0) {
105 alsa_write(dev
, pcm
, FRAME_SIZE
);
106 alsa_read(dev
, pcm
, FRAME_SIZE
);
107 memset(pcm
, 0, sizeof(pcm
));
115 static inline void engine_play_ring(struct alsa_dev
*dev
)
117 engine_play_file(dev
, ENGINE_SOUND_RING
);
120 static inline void engine_play_busy(struct alsa_dev
*dev
)
122 engine_play_file(dev
, ENGINE_SOUND_BUSY
);
125 static inline void engine_play_dial(struct alsa_dev
*dev
)
127 engine_play_file(dev
, ENGINE_SOUND_DIAL
);
130 static enum engine_state_num
engine_do_callout(int ssock
, int *csock
, int usocki
,
131 int usocko
, struct alsa_dev
*dev
)
133 int one
, mtu
, tries
, i
;
136 struct addrinfo hints
, *ahead
, *ai
;
138 struct pollfd fds
[3];
139 struct sockaddr raddr
;
141 struct transsip_hdr
*thdr
;
143 assert(ecurr
.active
== 0);
145 whine("In Callout!\n");
147 memset(&cpkt
, 0, sizeof(cpkt
));
148 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
149 if (ret
!= sizeof(cpkt
)) {
150 whine("Error receiving packet from cmdline!\n");
151 return ENGINE_STATE_IDLE
;
154 return ENGINE_STATE_IDLE
;
156 memset(&hints
, 0, sizeof(hints
));
157 hints
.ai_family
= PF_UNSPEC
;
158 hints
.ai_socktype
= SOCK_DGRAM
;
159 hints
.ai_protocol
= IPPROTO_UDP
;
160 hints
.ai_flags
= AI_NUMERICSERV
;
162 ret
= getaddrinfo(cpkt
.address
, cpkt
.port
, &hints
, &ahead
);
164 whine("Cannot get address info for %s:%s!\n",
165 cpkt
.address
, cpkt
.port
);
166 return ENGINE_STATE_IDLE
;
171 for (ai
= ahead
; ai
!= NULL
&& *csock
< 0; ai
= ai
->ai_next
) {
172 *csock
= socket(ai
->ai_family
, ai
->ai_socktype
,
177 ret
= connect(*csock
, ai
->ai_addr
, ai
->ai_addrlen
);
179 whine("Cannot connect to remote!\n");
186 setsockopt(*csock
, SOL_SOCKET
, SO_KEEPALIVE
, &one
, sizeof(one
));
188 mtu
= IP_PMTUDISC_DONT
;
189 setsockopt(*csock
, SOL_IP
, IP_MTU_DISCOVER
, &mtu
, sizeof(mtu
));
191 memcpy(&ecurr
.addr
, ai
->ai_addr
, ai
->ai_addrlen
);
192 ecurr
.addrlen
= ai
->ai_addrlen
;
199 whine("Cannot connect to server!\n");
200 return ENGINE_STATE_IDLE
;
205 memset(msg
, 0, sizeof(msg
));
206 thdr
= (struct transsip_hdr
*) msg
;
209 ret
= sendto(*csock
, msg
, sizeof(*thdr
), 0, &ecurr
.addr
,
212 whine("Cannot send ring probe to server!\n");
217 fds
[0].events
= POLLIN
;
219 fds
[1].events
= POLLIN
;
221 fds
[2].events
= POLLIN
;
223 while (!quit
&& tries
++ < 100) {
224 poll(fds
, array_size(fds
), 1500);
226 for (i
= 0; i
< array_size(fds
); ++i
) {
227 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
230 if (fds
[i
].fd
== usocki
) {
231 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
235 whine("User aborted call!\n");
240 if (fds
[i
].fd
== ssock
) {
241 memset(msg
, 0, sizeof(msg
));
242 ret
= recvfrom(ssock
, msg
, sizeof(msg
), 0,
247 memset(msg
, 0, sizeof(msg
));
248 thdr
= (struct transsip_hdr
*) msg
;
251 sendto(ssock
, msg
, sizeof(*thdr
), 0, &raddr
,
255 if (fds
[i
].fd
== *csock
) {
256 memset(msg
, 0, sizeof(msg
));
257 ret
= recvfrom(*csock
, msg
, sizeof(msg
), 0,
261 if (raddrlen
!= ecurr
.addrlen
)
263 if (memcmp(&raddr
, &ecurr
.addr
, raddrlen
))
266 thdr
= (struct transsip_hdr
*) msg
;
267 if (thdr
->est
== 1 && thdr
->psh
== 1) {
269 whine("Call established!\n");
270 return ENGINE_STATE_SPEAKING
;
272 if (thdr
->bsy
== 1) {
273 whine("Remote end busy!\n");
274 engine_play_busy(dev
);
275 engine_play_busy(dev
);
281 engine_play_dial(dev
);
287 return ENGINE_STATE_IDLE
;
290 static enum engine_state_num
engine_do_callin(int ssock
, int *csock
, int usocki
,
291 int usocko
, struct alsa_dev
*dev
)
296 struct sockaddr raddr
;
298 struct transsip_hdr
*thdr
;
299 char hbuff
[256], sbuff
[256];
300 struct pollfd fds
[2];
303 assert(ecurr
.active
== 0);
305 whine("In Callin!\n");
307 memset(&msg
, 0, sizeof(msg
));
308 ret
= recvfrom(ssock
, msg
, sizeof(msg
), 0, &raddr
, &raddrlen
);
311 return ENGINE_STATE_IDLE
;
314 thdr
= (struct transsip_hdr
*) msg
;
315 if (thdr
->est
!= 1) {
317 return ENGINE_STATE_IDLE
;
320 memcpy(&ecurr
.addr
, &raddr
, raddrlen
);
321 ecurr
.addrlen
= raddrlen
;
325 memset(hbuff
, 0, sizeof(hbuff
));
326 memset(sbuff
, 0, sizeof(sbuff
));
327 getnameinfo((struct sockaddr
*) &raddr
, raddrlen
, hbuff
, sizeof(hbuff
),
328 sbuff
, sizeof(sbuff
), NI_NUMERICHOST
| NI_NUMERICSERV
);
330 printf("New incoming connection from %s:%s!\n", hbuff
, sbuff
);
331 printf("Answer it with: take\n");
332 printf("Reject it with: hangup\n");
336 fds
[0].events
= POLLIN
;
338 fds
[1].events
= POLLIN
;
340 while (likely(!quit
)) {
341 poll(fds
, array_size(fds
), 1500);
343 for (i
= 0; i
< array_size(fds
); ++i
) {
344 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
347 if (fds
[i
].fd
== ssock
) {
348 memset(msg
, 0, sizeof(msg
));
349 ret
= recvfrom(ssock
, msg
, sizeof(msg
), 0,
353 if (raddrlen
!= ecurr
.addrlen
)
355 if (memcmp(&raddr
, &ecurr
.addr
, raddrlen
))
358 if (thdr
->fin
== 1) {
359 whine("Remote end hung up!\n");
360 engine_play_busy(dev
);
361 engine_play_busy(dev
);
367 if (fds
[i
].fd
== usocki
) {
368 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
372 memset(&msg
, 0, sizeof(msg
));
373 thdr
= (struct transsip_hdr
*) msg
;
376 sendto(ssock
, msg
, sizeof(*thdr
), 0,
377 &ecurr
.addr
, ecurr
.addrlen
);
379 whine("You aborted call!\n");
383 memset(&msg
, 0, sizeof(msg
));
384 thdr
= (struct transsip_hdr
*) msg
;
388 ret
= sendto(ssock
, msg
, sizeof(*thdr
), 0,
389 &ecurr
.addr
, ecurr
.addrlen
);
391 whine("Error sending ack!\n");
396 whine("Call established!\n");
397 return ENGINE_STATE_SPEAKING
;
402 engine_play_ring(dev
);
406 return ENGINE_STATE_IDLE
;
409 static enum engine_state_num
engine_do_speaking(int ssock
, int *csock
,
410 int usocki
, int usocko
,
411 struct alsa_dev
*dev
)
414 int recv_started
= 0, nfds
= 0, tmp
, i
;
415 struct pollfd
*pfds
= NULL
;
417 uint32_t send_seq
= 0;
419 CELTEncoder
*encoder
;
420 CELTDecoder
*decoder
;
421 JitterBuffer
*jitter
;
422 SpeexEchoState
*echostate
;
423 struct sockaddr raddr
;
424 struct transsip_hdr
*thdr
;
428 assert(ecurr
.active
== 1);
430 whine("In Speaking!\n");
432 mode
= celt_mode_create(SAMPLING_RATE
, FRAME_SIZE
, NULL
);
433 encoder
= celt_encoder_create(mode
, CHANNELS
, NULL
);
434 decoder
= celt_decoder_create(mode
, CHANNELS
, NULL
);
436 jitter
= jitter_buffer_init(FRAME_SIZE
);
438 jitter_buffer_ctl(jitter
, JITTER_BUFFER_SET_MARGIN
, &tmp
);
440 echostate
= speex_echo_state_init(FRAME_SIZE
, 10 * FRAME_SIZE
);
442 speex_echo_ctl(echostate
, SPEEX_ECHO_SET_SAMPLING_RATE
, &tmp
);
444 nfds
= alsa_nfds(dev
);
445 pfds
= xmalloc(sizeof(*pfds
) * (nfds
+ 2));
447 alsa_getfds(dev
, pfds
, nfds
);
449 pfds
[nfds
].fd
= ecurr
.sock
;
450 pfds
[nfds
].events
= POLLIN
;
451 pfds
[nfds
+ 1].fd
= usocki
;
452 pfds
[nfds
+ 1].events
= POLLIN
;
456 while (likely(!quit
)) {
457 poll(pfds
, nfds
+ 2, -1);
459 if (pfds
[nfds
+ 1].revents
& POLLIN
) {
460 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
464 whine("You aborted call!\n");
469 if (pfds
[nfds
].revents
& POLLIN
) {
470 JitterBufferPacket packet
;
472 memset(msg
, 0, sizeof(msg
));
473 ret
= recvfrom(ecurr
.sock
, msg
, sizeof(msg
), 0,
475 if (unlikely(ret
<= 0))
477 if (raddrlen
!= ecurr
.addrlen
||
478 memcmp(&raddr
, &ecurr
.addr
, raddrlen
)) {
479 memset(msg
, 0, sizeof(msg
));
481 thdr
= (struct transsip_hdr
*) msg
;
484 sendto(ecurr
.sock
, msg
, sizeof(*thdr
), 0,
490 thdr
= (struct transsip_hdr
*) msg
;
491 if (thdr
->fin
== 1 || thdr
->psh
== 0)
494 packet
.data
= msg
+ sizeof(*thdr
);
495 packet
.len
= ret
- sizeof(*thdr
);
496 packet
.timestamp
= ntohl(thdr
->seq
);
497 packet
.span
= FRAME_SIZE
;
500 jitter_buffer_put(jitter
, &packet
);
504 if (alsa_play_ready(dev
, pfds
, nfds
)) {
505 short pcm
[FRAME_SIZE
* CHANNELS
];
508 JitterBufferPacket packet
;
510 memset(msg
, 0, sizeof(msg
));
512 packet
.len
= MAX_MSG
;
514 jitter_buffer_tick(jitter
);
515 jitter_buffer_get(jitter
, &packet
,
520 celt_decode(decoder
, (const unsigned char *)
521 packet
.data
, packet
.len
, pcm
);
523 for (i
= 0; i
< FRAME_SIZE
* CHANNELS
; ++i
)
527 alsa_write(dev
, pcm
, FRAME_SIZE
);
528 speex_echo_state_reset(echostate
);
529 speex_echo_playback(echostate
, pcm
);
532 if (alsa_cap_ready(dev
, pfds
, nfds
)) {
533 short pcm
[FRAME_SIZE
* CHANNELS
];
534 short pcm2
[FRAME_SIZE
* CHANNELS
];
536 alsa_read(dev
, pcm
, FRAME_SIZE
);
538 speex_echo_capture(echostate
, pcm
, pcm2
);
539 for (i
= 0; i
< FRAME_SIZE
* CHANNELS
; ++i
)
542 memset(msg
, 0, sizeof(msg
));
543 thdr
= (struct transsip_hdr
*) msg
;
545 celt_encode(encoder
, pcm
, NULL
, (unsigned char *)
546 (msg
+ sizeof(*thdr
)), PACKETSIZE
);
550 thdr
->seq
= htonl(send_seq
);
551 send_seq
+= FRAME_SIZE
;
553 ret
= sendto(ecurr
.sock
, msg
,
554 PACKETSIZE
+ sizeof(*thdr
), 0,
555 &ecurr
.addr
, ecurr
.addrlen
);
557 whine("Send datagram failed!\n");
566 memset(msg
, 0, sizeof(msg
));
567 thdr
= (struct transsip_hdr
*) msg
;
570 sendto(ecurr
.sock
, msg
, sizeof(*thdr
), 0, &ecurr
.addr
,
573 if (ecurr
.sock
== *csock
) {
578 celt_encoder_destroy(encoder
);
579 celt_decoder_destroy(decoder
);
580 celt_mode_destroy(mode
);
582 jitter_buffer_destroy(jitter
);
583 speex_echo_state_destroy(echostate
);
588 return ENGINE_STATE_IDLE
;
591 static enum engine_state_num
engine_do_idle(int ssock
, int *csock
, int usocki
,
592 int usocko
, struct alsa_dev
*dev
)
596 struct pollfd fds
[2];
598 struct transsip_hdr
*thdr
;
601 assert(ecurr
.active
== 0);
606 fds
[0].events
= POLLIN
;
608 fds
[1].events
= POLLIN
;
610 while (likely(!quit
)) {
611 memset(msg
, 0, sizeof(msg
));
613 poll(fds
, array_size(fds
), -1);
614 for (i
= 0; i
< array_size(fds
); ++i
) {
615 if ((fds
[i
].revents
& POLLIN
) != POLLIN
)
618 if (fds
[i
].fd
== ssock
) {
619 ret
= recv(ssock
, msg
, sizeof(msg
), MSG_PEEK
);
622 thdr
= (struct transsip_hdr
*) msg
;
624 return ENGINE_STATE_CALLIN
;
627 if (fds
[i
].fd
== usocki
) {
628 ret
= read(usocki
, &cpkt
, sizeof(cpkt
));
632 return ENGINE_STATE_CALLOUT
;
637 return ENGINE_STATE_IDLE
;
640 struct engine_state state_machine
[__ENGINE_STATE_MAX
] __read_mostly
= {
641 STATE_MAP_SET(ENGINE_STATE_IDLE
, engine_do_idle
),
642 STATE_MAP_SET(ENGINE_STATE_CALLOUT
, engine_do_callout
),
643 STATE_MAP_SET(ENGINE_STATE_CALLIN
, engine_do_callin
),
644 STATE_MAP_SET(ENGINE_STATE_SPEAKING
, engine_do_speaking
),
647 void *engine_main(void *arg
)
649 int ssock
= -1, csock
, ret
, mtu
, usocki
, usocko
;
650 enum engine_state_num state
;
651 struct addrinfo hints
, *ahead
, *ai
;
652 struct alsa_dev
*dev
= NULL
;
653 struct pipepair
*pp
= arg
;
658 memset(&hints
, 0, sizeof(hints
));
659 hints
.ai_family
= PF_UNSPEC
;
660 hints
.ai_socktype
= SOCK_DGRAM
;
661 hints
.ai_protocol
= IPPROTO_UDP
;
662 hints
.ai_flags
= AI_PASSIVE
;
664 ret
= getaddrinfo(NULL
, port
, &hints
, &ahead
);
666 panic("Cannot get address info!\n");
668 for (ai
= ahead
; ai
!= NULL
&& ssock
< 0; ai
= ai
->ai_next
) {
669 ssock
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
);
672 if (ai
->ai_family
== AF_INET6
) {
675 ret
= setsockopt(ssock
, IPPROTO_IPV6
, IPV6_V6ONLY
,
686 #endif /* IPV6_V6ONLY */
689 mtu
= IP_PMTUDISC_DONT
;
690 setsockopt(ssock
, SOL_IP
, IP_MTU_DISCOVER
, &mtu
, sizeof(mtu
));
692 ret
= bind(ssock
, ai
->ai_addr
, ai
->ai_addrlen
);
702 panic("Cannot open socket!\n");
704 dev
= alsa_open(alsadev
, SAMPLING_RATE
, CHANNELS
, FRAME_SIZE
);
706 panic("Cannot open ALSA device %s!\n", alsadev
);
708 state
= ENGINE_STATE_IDLE
;
709 while (likely(!quit
)) {
710 state
= state_machine
[state
].process(ssock
, &csock
,