4 #include "library/filesys.hpp"
5 #include "library/minmax.hpp"
6 #include "library/workthread.hpp"
7 #include "library/serialization.hpp"
8 #include "library/string.hpp"
9 #include "library/ogg.hpp"
10 #include "core/audioapi.hpp"
11 #include "core/command.hpp"
12 #include "core/dispatch.hpp"
13 #include "core/framerate.hpp"
14 #include "core/inthread.hpp"
15 #include "core/keymapper.hpp"
16 #include "core/misc.hpp"
27 #include "opus/opus.h"
28 #include "opus/opus_defines.h"
30 //Farther than this, packets can be fastskipped.
31 #define OPUS_CONVERGE_MAX 5760
32 //Maximum size of PCM output for one packet.
33 #define OPUS_MAX_OUT 5760
35 #define OUTPUT_BLOCK 1440
37 #define OPUS_SAMPLERATE 48000
39 #define OPUS_BLOCK_SIZE 960
40 //Threshold for decoding additional block
41 #define BLOCK_THRESHOLD 1200
42 //Maximum output block size.
43 #define OUTPUT_SIZE (BLOCK_THRESHOLD + OUTPUT_BLOCK)
44 //Amount of microseconds per interation.
45 #define ITERATION_TIME 15000
46 //Opus bitrate to use.
47 #define OPUS_BITRATE 48000
48 //Ogg Opus granule rate.
49 #define OGGOPUS_GRANULERATE 48000
50 //Record buffer size threshold divider.
51 #define REC_THRESHOLD_DIV 40
52 //Playback buffer size threshold divider.
53 #define PLAY_THRESHOLD_DIV 30
54 //Special granule position: None.
55 #define GRANULEPOS_NONE 0xFFFFFFFFFFFFFFFFULL
59 class opus_playback_stream
;
61 class stream_collection
;
63 //Recording active flag.
64 volatile bool active_flag
= false;
65 //Last seen frame number.
66 uint64_t last_frame_number
= 0;
69 //Mutex protecting current_time and time_jump.
70 mutex_class time_mutex
;
72 uint64_t current_time
;
73 //Time jump flag. Set if time jump is detected.
74 //If time jump is detected, all current playing streams are stopped, stream locks are cleared and
75 //apropriate streams are restarted. If time jump is false, all unlocked streams coming into range
78 //Lock protecting active_playback_streams.
79 mutex_class active_playback_streams_lock
;
80 //List of streams currently playing.
81 std::list
<opus_playback_stream
*> active_playback_streams
;
82 //The collection of streams.
83 stream_collection
* current_collection
;
84 //Lock protecting current collection.
85 mutex_class current_collection_lock
;
87 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
88 //Information about individual opus packet in stream.
89 struct opus_packetinfo
91 //Length is in units of 1/400th of a second.
92 opus_packetinfo(uint16_t datasize
, uint8_t length
, uint64_t offset
)
94 descriptor
= (offset
& 0xFFFFFFFFFFULL
) | (static_cast<uint64_t>(length
) << 40) |
95 (static_cast<uint64_t>(datasize
) << 48);
97 //Get the data size of the packet.
98 uint16_t size() { return descriptor
>> 48; }
99 //Calculate the length of packet in samples.
100 uint16_t length() { return 120 * ((descriptor
>> 40) & 0xFF); }
101 //Calculate the true offset.
102 uint64_t offset() { return descriptor
& 0xFFFFFFFFFFULL
; }
105 std::vector
<unsigned char> packet(filesystem_ref from_sys
);
110 std::vector
<unsigned char> opus_packetinfo::packet(filesystem_ref from_sys
)
112 std::vector
<unsigned char> ret
;
113 uint64_t off
= offset();
114 uint32_t sz
= size();
115 uint32_t cluster
= off
/ CLUSTER_SIZE
;
116 uint32_t coff
= off
% CLUSTER_SIZE
;
118 size_t r
= from_sys
.read_data(cluster
, coff
, &ret
[0], sz
);
120 throw std::runtime_error("Incomplete read");
124 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
125 //Information about opus stream.
128 //Create new empty stream with specified base time.
129 opus_stream(uint64_t base
, filesystem_ref filesys
);
130 //Read stream with specified base time and specified start clusters.
132 opus_stream(uint64_t base
, filesystem_ref filesys
, uint32_t ctrl_cluster
, uint32_t data_cluster
);
133 //Import a stream with specified base time.
135 opus_stream(uint64_t base
, filesystem_ref filesys
, std::ifstream
& data
,
136 external_stream_format extfmt
);
137 //Delete this stream (also puts a ref)
138 void delete_stream() { deleting
= true; put_ref(); }
141 void export_stream(std::ofstream
& data
, external_stream_format extfmt
);
142 //Get length of specified packet in samples.
143 uint16_t packet_length(uint32_t seqno
)
145 return (seqno
< packets
.size()) ? packets
[seqno
].length() : 0;
147 //Get data of specified packet.
149 std::vector
<unsigned char> packet(uint32_t seqno
)
151 return (seqno
< packets
.size()) ? packets
[seqno
].packet(fs
) : std::vector
<unsigned char>();
153 //Get base time in samples for stream.
154 uint64_t timebase() { return s_timebase
; }
155 //Set base time in samples for stream.
156 void timebase(uint64_t ts
) { s_timebase
= ts
; }
157 //Get length of stream in samples.
160 if(pregap_length
+ postgap_length
> total_len
)
163 return total_len
- pregap_length
- postgap_length
;
165 //Set the pregap length.
166 void set_pregap(uint32_t p
) { pregap_length
= p
; }
167 //Get the pregap length.
168 uint32_t get_pregap() { return pregap_length
; }
169 //Set the postgap length.
170 void set_potsgap(uint32_t p
) { postgap_length
= p
; }
171 //Get the postgap length.
172 uint32_t get_postgap() { return postgap_length
; }
174 void set_gain(int16_t g
) { gain
= g
; }
176 int16_t get_gain() { return gain
; }
178 float get_gain_linear() { return pow(10, gain
/ 20); }
179 //Get number of packets in stream.
180 uint32_t blocks() { return packets
.size(); }
181 //Is this stream locked?
182 bool islocked() { return locked
; }
184 void lock() { locked
= true; }
186 void unlock() { locked
= false; }
187 //Increment reference count.
188 void get_ref() { umutex_class
m(reflock
); refcount
++; }
189 //Decrement reference count, destroying object if it hits zero.
190 void put_ref() { umutex_class
m(reflock
); refcount
--; if(!refcount
) destroy(); }
191 //Add new packet into stream.
192 //Not safe to call simultaneously with packet_length() or packet().
194 void write(uint8_t len
, const unsigned char* payload
, size_t payload_len
);
195 //Write stream trailer.
196 void write_trailier();
198 std::pair
<uint32_t, uint32_t> get_clusters() { return std::make_pair(ctrl_cluster
, data_cluster
); }
200 void export_stream_opusdemo(std::ofstream
& data
);
201 void export_stream_sox(std::ofstream
& data
);
202 void export_stream_oggopus(std::ofstream
& data
);
203 void import_stream_opusdemo(std::ifstream
& data
);
204 void import_stream_sox(std::ifstream
& data
);
205 void import_stream_oggopus(std::ifstream
& data
);
207 opus_stream(const opus_stream
&);
208 opus_stream
& operator=(const opus_stream
&);
211 std::vector
<opus_packetinfo
> packets
;
214 uint32_t next_cluster
;
215 uint32_t next_offset
;
216 uint32_t next_mcluster
;
217 uint32_t next_moffset
;
218 uint32_t ctrl_cluster
;
219 uint32_t data_cluster
;
220 uint32_t pregap_length
;
221 uint32_t postgap_length
;
229 opus_stream::opus_stream(uint64_t base
, filesystem_ref filesys
)
248 opus_stream::opus_stream(uint64_t base
, filesystem_ref filesys
, uint32_t _ctrl_cluster
,
249 uint32_t _data_cluster
)
257 next_cluster
= data_cluster
= _data_cluster
;
258 next_mcluster
= ctrl_cluster
= _ctrl_cluster
;
264 //Read the data buffers.
265 char buf
[CLUSTER_SIZE
];
266 uint32_t last_cluster_seen
= next_mcluster
;
267 uint64_t total_size
= 0;
268 uint64_t total_frames
= 0;
269 bool trailers
= false;
270 bool saved_pointer_valid
= false;
271 uint32_t saved_next_mcluster
= 0;
272 uint32_t saved_next_moffset
= 0;
274 last_cluster_seen
= next_mcluster
;
275 size_t r
= fs
.read_data(next_mcluster
, next_moffset
, buf
, CLUSTER_SIZE
);
277 //The stream ends here.
280 //Find the first unused entry if any.
281 for(unsigned i
= 0; i
< CLUSTER_SIZE
; i
+= 4)
282 if(!buf
[i
+ 3] || trailers
) {
283 //This entry is unused. If the next entry is also unused, that is the end.
284 //Otherwise, there might be stream trailers.
285 if(trailers
&& !buf
[i
+ 3]) {
286 goto out_parsing
; //Ends for real.
289 //Set the trailer flag and continue parsing.
290 //The saved offset must be placed here.
291 saved_next_mcluster
= last_cluster_seen
;
292 saved_next_moffset
= i
;
293 saved_pointer_valid
= true;
297 //This is a trailer entry.
298 if(buf
[i
+ 3] == 2) {
300 pregap_length
= read32ube(buf
+ i
) >> 8;
301 } else if(buf
[i
+ 3] == 3) {
303 postgap_length
= read32ube(buf
+ i
) >> 8;
304 } else if(buf
[i
+ 3] == 4) {
306 gain
= read16sbe(buf
+ i
);
309 uint16_t psize
= read16ube(buf
+ i
);
310 uint8_t plen
= read8ube(buf
+ i
+ 2);
312 total_len
+= 120 * plen
;
313 opus_packetinfo
p(psize
, plen
, 1ULL * next_cluster
* CLUSTER_SIZE
+
315 size_t r2
= fs
.skip_data(next_cluster
, next_offset
, psize
);
317 throw std::runtime_error("Incomplete data stream");
318 packets
.push_back(p
);
323 //If saved pointer is valid, restore to that.
324 if(saved_pointer_valid
) {
325 next_mcluster
= saved_next_mcluster
;
326 next_moffset
= saved_next_moffset
;
330 opus_stream::opus_stream(uint64_t base
, filesystem_ref filesys
, std::ifstream
& data
,
331 external_stream_format extfmt
)
348 if(extfmt
== EXTFMT_OPUSDEMO
)
349 import_stream_opusdemo(data
);
350 else if(extfmt
== EXTFMT_OGGOPUS
)
351 import_stream_oggopus(data
);
352 else if(extfmt
== EXTFMT_SOX
)
353 import_stream_sox(data
);
356 void opus_stream::import_stream_opusdemo(std::ifstream
& data
)
359 unsigned char tmpi
[65536];
360 float tmp
[OPUS_MAX_OUT
];
361 OpusDecoder
* dec
= opus_decoder_create(48000, 1, &err
);
367 uint32_t psize
= read32ube(head
);
368 uint32_t pstate
= read32ube(head
+ 4);
369 if(psize
> sizeof(tmpi
)) {
370 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
371 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
372 opus_decoder_destroy(dec
);
373 throw std::runtime_error("Packet too large to decode");
375 data
.read(reinterpret_cast<char*>(tmpi
), psize
);
377 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
378 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
379 opus_decoder_destroy(dec
);
380 throw std::runtime_error("Error reading opus packet");
382 int r
= opus_decode_float(dec
, tmpi
, psize
, tmp
,
385 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
386 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
387 opus_decoder_destroy(dec
);
388 (stringfmt() << "Error decoding opus packet: " << opus_strerror(r
)).throwex();
391 opus_decoder_ctl(dec
, OPUS_GET_FINAL_RANGE(&cstate
));
392 if(cstate
!= pstate
) {
393 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
394 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
395 opus_decoder_destroy(dec
);
396 throw std::runtime_error("Opus packet checksum mismatch");
398 r
= opus_decoder_get_nb_samples(dec
, tmpi
, psize
);
399 if(r
< 0 || r
% 120) {
400 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
401 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
402 opus_decoder_destroy(dec
);
403 throw std::runtime_error("Error getting length of opus packet");
405 uint8_t plen
= r
/ 120;
407 write(plen
, tmpi
, psize
);
409 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
410 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
411 opus_decoder_destroy(dec
);
415 opus_decoder_destroy(dec
);
419 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
420 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
425 class oggopus_importer
430 const unsigned char* data
;
435 void put_page(const ogg_page
& p
);
436 bool packet_pending();
438 size_t get_postgap();
440 oggopus_importer(const oggopus_importer
&);
441 oggopus_importer
& operator=(const oggopus_importer
&);
442 std::vector
<uint8_t> incomplete
;
443 std::vector
<uint8_t> pincomplete
;
444 const ogg_page
* curpage
;
450 uint64_t expected_granule
;
451 bool granule_correction_done
;
454 oggopus_importer::oggopus_importer()
461 incomplete_f
= false;
462 expected_granule
= 0;
463 granule_correction_done
= false;
466 void oggopus_importer::put_page(const ogg_page
& p
)
468 //If not continued packet, drop the last packet.
469 if(!p
.get_continue() && incomplete_f
)
470 messages
<< "Warning: Incomplete packet not continued by the next page" << std::endl
;
471 if(!p
.get_continue())
473 //If totally empty page...
474 if(p
.get_packet_count() == 0) {
475 if(p
.get_granulepos() != ogg_page::granulepos_none
)
476 (stringfmt() << "Bad granulepos in empty page: Expected -1, got "
477 << p
.get_granulepos()).throwex();
483 //If continued packet, paste packet 0 to previous.
484 size_t tmppackets
= p
.get_packet_count();
486 if((tmp_incomplete
= p
.get_last_packet_incomplete()))
488 if(tmppackets
== 0 && p
.get_granulepos() != ogg_page::granulepos_none
)
489 (stringfmt() << "Bad granulepos in page with no complete packets: Expected -1, got "
490 << p
.get_granulepos()).throwex();
491 if(tmppackets
> 0 && p
.get_granulepos() == ogg_page::granulepos_none
)
492 (stringfmt() << "Bad granulepos in page with complete packets: Expected != -1, got "
494 auto p0
= p
.get_packet(0);
495 size_t off
= incomplete
.size();
496 incomplete
.resize(off
+ p0
.second
);
498 memcpy(&incomplete
[off
], p0
.first
, p0
.second
);
499 incomplete_f
= tmp_incomplete
;
500 pagepackets
= tmppackets
;
503 eospage
= p
.get_eos();
506 bool oggopus_importer::packet_pending()
508 return (curpacket
< pagepackets
);
511 oggopus_importer::packet
oggopus_importer::get_packet()
513 oggopus_importer::packet p
;
515 //Packet 0 is special.
516 pincomplete
= incomplete
;
517 p
.data
= &pincomplete
[0];
518 p
.size
= pincomplete
.size();
520 auto pn
= curpage
->get_packet(curpacket
);
525 if(curpacket
== pagepackets
&& incomplete_f
) {
526 //Copy the incomplete page to buffer.
527 auto pn
= curpage
->get_packet(pagepackets
);
528 incomplete
.resize(pn
.second
);
530 memcpy(&incomplete
[0], pn
.first
, pn
.second
);
533 messages
<< "Warning, skipping empty Opus packet" << std::endl
;
536 int frames
= opus_packet_get_nb_frames(p
.data
, p
.size
);
537 int samples_pf
= opus_packet_get_samples_per_frame(p
.data
, OGGOPUS_GRANULERATE
);
538 if(frames
< 0 || samples_pf
< 0)
539 (stringfmt() << "Bad Opus packet").throwex();
540 p
.units
= frames
* samples_pf
/ 120;
542 expected_granule
+= p
.units
* 120;
543 if(curpacket
== pagepackets
&& curpage
->get_eos()) {
544 if(curpage
->get_eos()) {
545 //The postgap is expected_granule - granulepos.
546 if(curpage
->get_granulepos() > expected_granule
)
547 (stringfmt() << "Page granule too large, expected maximum of " <<
548 expected_granule
<< ", got " << curpage
->get_granulepos()).throwex();
549 postgap
= expected_granule
- curpage
->get_granulepos();
550 if(postgap
> p
.units
* 120) {
551 messages
<< "Warning, postgap too large, clipped to last packet" << std::endl
;
552 postgap
= p
.units
* 120;
554 } else if(!granule_correction_done
)
555 expected_granule
= curpage
->get_granulepos();
557 (stringfmt() << "Page granule invalid, expected " <<
558 expected_granule
<< ", got " << curpage
->get_granulepos()).throwex();
559 granule_correction_done
= true;
564 size_t oggopus_importer::get_postgap()
569 void opus_stream::import_stream_oggopus(std::ifstream
& data
)
571 ogg_stream_reader_iostreams
reader(data
);
573 oggopus_importer importer
;
574 uint32_t stream_seq
= 0; //The imprinting duckling model.
575 int state
= 0; //Not locked.
577 uint32_t last_page_seen
= 0xFFFFFFFFUL
;
578 bool seen_data
= false;
579 reader
.set_errors_to(messages
);
581 while(reader
.get_page(page
)) {
582 if(state
&& page
.get_stream() != stream_seq
)
583 continue; //Wrong stream.
584 if(state
&& page
.get_sequence() != last_page_seen
+ 1)
585 messages
<< "Warning: Packet(s) missing in OggOpus stream" << std::endl
;
586 last_page_seen
= page
.get_sequence();
587 struct oggopus_header h
;
588 struct oggopus_tags t
;
589 size_t packets
= page
.get_packet_count();
591 case 0: //Not locked.
593 h
= parse_oggopus_header(page
);
598 throw std::runtime_error("Multistream OggOpus streams are not "
600 state
= 1; //Expecting comment.
601 pregap_length
= h
.preskip
;
603 stream_seq
= page
.get_stream();
604 last_page_seen
= page
.get_sequence();
606 case 1: //Expecting comment.
607 t
= parse_oggopus_tags(page
);
608 state
= 2; //Data page.
610 throw std::runtime_error("Empty OggOpus stream");
611 //We don't do anything with this.
614 importer
.put_page(page
);
615 while(importer
.packet_pending()) {
616 auto p
= importer
.get_packet();
618 write(p
.units
, p
.data
, p
.size
);
621 state
= 3; //End of stream.
628 throw std::runtime_error("No OggOpus stream found");
630 throw std::runtime_error("Oggopus stream missing required tags page");
632 messages
<< "Warning: Incomplete Oggopus stream." << std::endl
;
633 postgap_length
= importer
.get_postgap();
636 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
637 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
642 void opus_stream::import_stream_sox(std::ifstream
& data
)
645 unsigned char tmpi
[65536];
646 float tmp
[OPUS_MAX_OUT
];
648 data
.read(header
, 32);
650 throw std::runtime_error("Can't read .sox header");
651 if(read32ule(header
+ 0) != 0x586F532EULL
)
652 throw std::runtime_error("Bad .sox header magic");
653 if(read8ube(header
+ 4) > 28)
654 data
.read(header
+ 32, read8ube(header
+ 4) - 28);
656 throw std::runtime_error("Can't read .sox header");
657 if(read64ule(header
+ 16) != 4676829883349860352ULL)
658 throw std::runtime_error("Bad .sox sampling rate");
659 if(read32ule(header
+ 24) != 1)
660 throw std::runtime_error("Only mono streams are supported");
661 uint64_t samples
= read64ule(header
+ 8);
662 OpusEncoder
* enc
= opus_encoder_create(48000, 1, OPUS_APPLICATION_VOIP
, &err
);
663 opus_encoder_ctl(enc
, OPUS_SET_BITRATE(OPUS_BITRATE
));
665 opus_encoder_ctl(enc
, OPUS_GET_LOOKAHEAD(&pregap
));
666 pregap_length
= pregap
;
667 for(uint64_t i
= 0; i
< samples
+ pregap
; i
+= OPUS_BLOCK_SIZE
) {
668 size_t bs
= OPUS_BLOCK_SIZE
;
669 if(i
+ bs
> samples
+ pregap
)
670 bs
= samples
+ pregap
- i
;
671 //We have to read zero bytes after the end of stream.
672 size_t readable
= bs
;
673 if(readable
+ i
> samples
)
674 readable
= max(samples
, i
) - i
;
676 data
.read(reinterpret_cast<char*>(tmpi
), 4 * readable
);
678 memset(tmpi
+ 4 * readable
, 0, 4 * (bs
- readable
));
680 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
681 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
682 opus_encoder_destroy(enc
);
683 throw std::runtime_error("Can't read .sox data");
685 for(size_t j
= 0; j
< bs
; j
++)
686 tmp
[j
] = static_cast<float>(read32sle(tmpi
+ 4 * j
)) / 268435456;
687 if(bs
< OPUS_BLOCK_SIZE
)
688 postgap_length
= OPUS_BLOCK_SIZE
- bs
;
689 for(size_t j
= bs
; j
< OPUS_BLOCK_SIZE
; j
++)
691 int r
= opus_encode_float(enc
, tmp
, OPUS_BLOCK_SIZE
, tmpi
, sizeof(tmpi
));
693 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
694 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
695 opus_encoder_destroy(enc
);
696 (stringfmt() << "Error encoding opus packet: " << opus_strerror(r
)).throwex();
699 write(OPUS_BLOCK_SIZE
/ 120, tmpi
, r
);
701 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
702 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
703 opus_encoder_destroy(enc
);
707 opus_encoder_destroy(enc
);
711 if(ctrl_cluster
) fs
.free_cluster_chain(ctrl_cluster
);
712 if(data_cluster
) fs
.free_cluster_chain(data_cluster
);
717 void opus_stream::destroy()
720 //We catch the errors and print em, because otherwise put_ref could throw, which would
723 fs
.free_cluster_chain(ctrl_cluster
);
724 } catch(std::exception
& e
) {
725 messages
<< "Failed to delete stream control file: " << e
.what();
728 fs
.free_cluster_chain(data_cluster
);
729 } catch(std::exception
& e
) {
730 messages
<< "Failed to delete stream data file: " << e
.what();
736 void opus_stream::export_stream_opusdemo(std::ofstream
& data
)
739 OpusDecoder
* dec
= opus_decoder_create(48000, 1, &err
);
740 std::vector
<unsigned char> p
;
741 float tmp
[OPUS_MAX_OUT
];
742 for(size_t i
= 0; i
< packets
.size(); i
++) {
747 } catch(std::exception
& e
) {
748 opus_decoder_destroy(dec
);
749 (stringfmt() << "Error reading opus packet: " << e
.what()).throwex();
751 int r
= opus_decode_float(dec
, &p
[0], p
.size(), tmp
, OPUS_MAX_OUT
, 0);
753 opus_decoder_destroy(dec
);
754 (stringfmt() << "Error decoding opus packet: " << opus_strerror(r
)).throwex();
756 opus_decoder_ctl(dec
, OPUS_GET_FINAL_RANGE(&state
));
757 write32ube(head
+ 0, p
.size());
758 write32ube(head
+ 4, state
);
760 data
.write(reinterpret_cast<char*>(&p
[0]), p
.size());
762 opus_decoder_destroy(dec
);
763 throw std::runtime_error("Error writing opus packet");
766 opus_decoder_destroy(dec
);
769 void opus_stream::export_stream_oggopus(std::ofstream
& data
)
771 oggopus_header header
;
773 ogg_stream_writer_iostreams
writer(data
);
777 header
.preskip
= pregap_length
;
778 header
.rate
= OPUS_SAMPLERATE
;
780 header
.map_family
= 0;
783 header
.chanmap
[0] = 0;
784 memset(header
.chanmap
+ 1, 255, 254);
785 tags
.vendor
= "lsnes rr" + lsnes_version
;
786 tags
.comments
.push_back((stringfmt() << "LSNES_STREAM_TS=" << s_timebase
).str());
787 struct ogg_page hpage
= serialize_oggopus_header(header
);
788 struct ogg_page tpage
= serialize_oggopus_tags(tags
);
789 struct ogg_page ppage
;
790 uint64_t true_granule
= 0;
795 writer
.put_page(hpage
);
796 writer
.put_page(tpage
);
797 for(size_t i
= 0; i
< packets
.size(); i
++) {
798 std::vector
<unsigned char> p
;
801 } catch(std::exception
& e
) {
802 (stringfmt() << "Error reading opus packet: " << e
.what()).throwex();
805 (stringfmt() << "Empty Opus packet is not valid").throwex();
806 int frames
= opus_packet_get_nb_frames(&p
[0], p
.size());
807 int samples_pf
= opus_packet_get_samples_per_frame(&p
[0], OGGOPUS_GRANULERATE
);
808 if(frames
< 0 || samples_pf
< 0)
809 (stringfmt() << "Bad Opus packet").throwex();
810 if(!ppage
.append_packet(&p
[0], p
.size())) {
812 ppage
.set_granulepos(true_granule
);
813 ppage
.set_sequence(seq
++);
814 writer
.put_page(ppage
);
815 ppage
= ogg_page(); //Reset.
816 if(!ppage
.append_packet(&p
[0], p
.size()))
817 throw std::runtime_error("Internal error: Opus packet larger than page");
819 true_granule
+= frames
* samples_pf
;
822 ppage
.set_sequence(seq
++);
823 ppage
.set_granulepos(true_granule
- postgap_length
);
824 writer
.put_page(ppage
);
827 void opus_stream::export_stream_sox(std::ofstream
& data
)
830 OpusDecoder
* dec
= opus_decoder_create(48000, 1, &err
);
831 std::vector
<unsigned char> p
;
832 float tmp
[OPUS_MAX_OUT
];
834 write64ule(header
, 0x1C586F532EULL
); //Magic and header size.
835 write64ule(header
+ 16, 4676829883349860352ULL); //Sampling rate.
836 write32ule(header
+ 24, 1);
838 uint32_t lookahead_thrown
= 0;
839 data
.write(header
, 32);
841 opus_decoder_destroy(dec
);
842 throw std::runtime_error("Error writing PCM data.");
844 float lgain
= get_gain_linear();
845 for(size_t i
= 0; i
< packets
.size(); i
++) {
846 char blank
[4] = {0, 0, 0, 0};
847 std::vector
<unsigned char> p
;
850 } catch(std::exception
& e
) {
851 opus_decoder_destroy(dec
);
852 (stringfmt() << "Error reading opus packet: " << e
.what()).throwex();
854 uint32_t len
= packet_length(i
);
855 int r
= opus_decode_float(dec
, &p
[0], p
.size(), tmp
, OPUS_MAX_OUT
, 0);
856 bool is_last
= (i
== packets
.size() - 1);
857 uint32_t pregap_throw
= 0;
858 uint32_t postgap_throw
= 0;
860 opus_decoder_destroy(dec
);
861 (stringfmt() << "Error decoding opus packet: " << opus_strerror(r
)).throwex();
863 if(lookahead_thrown
< pregap_length
) {
864 //We haven't yet thrown the full pregap. Throw some.
865 uint32_t maxthrow
= pregap_length
- lookahead_thrown
;
866 pregap_throw
= min(len
, maxthrow
);
867 lookahead_thrown
+= pregap_length
;
870 postgap_throw
= min(len
- pregap_throw
, postgap_length
);
871 tlen
+= (len
- pregap_throw
- postgap_throw
);
872 for(uint32_t j
= pregap_throw
; j
< len
- postgap_throw
; j
++) {
873 int32_t s
= (int32_t)(tmp
[j
] * lgain
* 268435456.0);
874 write32sle(blank
, s
);
875 data
.write(blank
, 4);
877 throw std::runtime_error("Error writing PCM data.");
880 data
.seekp(0, std::ios_base::beg
);
881 write64ule(header
+ 8, tlen
);
882 data
.write(header
, 32);
884 opus_decoder_destroy(dec
);
885 throw std::runtime_error("Error writing PCM data.");
887 opus_decoder_destroy(dec
);
890 void opus_stream::export_stream(std::ofstream
& data
, external_stream_format extfmt
)
892 if(extfmt
== EXTFMT_OPUSDEMO
)
893 export_stream_opusdemo(data
);
894 else if(extfmt
== EXTFMT_OGGOPUS
)
895 export_stream_oggopus(data
);
896 else if(extfmt
== EXTFMT_SOX
)
897 export_stream_sox(data
);
900 void opus_stream::write(uint8_t len
, const unsigned char* payload
, size_t payload_len
)
904 uint32_t used_cluster
, used_offset
;
905 uint32_t used_mcluster
, used_moffset
;
907 next_cluster
= data_cluster
= fs
.allocate_cluster();
909 next_mcluster
= ctrl_cluster
= fs
.allocate_cluster();
910 write16ube(descriptor
, payload_len
);
911 write8ube(descriptor
+ 2, len
);
912 write8ube(descriptor
+ 3, 1);
913 fs
.write_data(next_cluster
, next_offset
, payload
, payload_len
, used_cluster
, used_offset
);
914 fs
.write_data(next_mcluster
, next_moffset
, descriptor
, 4, used_mcluster
, used_moffset
);
915 uint64_t off
= static_cast<uint64_t>(used_cluster
) * CLUSTER_SIZE
+ used_offset
;
916 opus_packetinfo
p(payload_len
, len
, off
);
917 total_len
+= p
.length();
918 packets
.push_back(p
);
919 } catch(std::exception
& e
) {
920 (stringfmt() << "Can't write opus packet: " << e
.what()).throwex();
924 void opus_stream::write_trailier()
928 uint32_t used_mcluster
, used_moffset
;
929 //The allocation must be done for real.
931 next_mcluster
= ctrl_cluster
= fs
.allocate_cluster();
932 //But the write must not update the pointers..
933 uint32_t tmp_mcluster
= next_mcluster
;
934 uint32_t tmp_moffset
= next_moffset
;
935 write32ube(descriptor
, 0);
936 write32ube(descriptor
+ 4, (pregap_length
<< 8) | 0x02);
937 write32ube(descriptor
+ 8, (postgap_length
<< 8) | 0x03);
938 write16sbe(descriptor
+ 12, gain
);
939 write16ube(descriptor
+ 14, 0x0004);
940 fs
.write_data(tmp_mcluster
, tmp_moffset
, descriptor
, 16, used_mcluster
, used_moffset
);
941 } catch(std::exception
& e
) {
942 (stringfmt() << "Can't write stream trailer: " << e
.what()).throwex();
947 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
948 //Playing opus stream.
949 struct opus_playback_stream
951 //Create a new playing stream from given opus stream.
952 opus_playback_stream(opus_stream
& data
);
953 //Destroy playing opus stream.
954 ~opus_playback_stream();
955 //Read samples from stream.
957 void read(float* data
, size_t samples
);
958 //Skip samples from stream.
960 void skip(uint64_t samples
);
961 //Has the stream already ended?
964 opus_playback_stream(const opus_playback_stream
&);
965 opus_playback_stream
& operator=(const opus_playback_stream
&);
968 float output
[OPUS_MAX_OUT
];
969 unsigned output_left
;
970 uint32_t pregap_thrown
;
972 OpusDecoder
* decoder
;
978 opus_playback_stream::opus_playback_stream(opus_stream
& data
)
987 postgap_thrown
= false;
988 blocks
= stream
.blocks();
989 decoder
= opus_decoder_create(OPUS_SAMPLERATE
, 1, &err
);
991 throw std::bad_alloc();
994 opus_playback_stream::~opus_playback_stream()
996 //No, we don't unlock the stream.
998 opus_decoder_destroy(decoder
);
1001 bool opus_playback_stream::eof()
1003 return (next_block
>= blocks
&& !output_left
);
1006 void opus_playback_stream::decode_block()
1008 if(next_block
>= blocks
)
1010 if(output_left
>= OPUS_MAX_OUT
)
1012 unsigned plen
= stream
.packet_length(next_block
);
1013 if(plen
+ output_left
> OPUS_MAX_OUT
)
1015 std::vector
<unsigned char> pdata
= stream
.packet(next_block
);
1016 int c
= opus_decode_float(decoder
, &pdata
[0], pdata
.size(), output
+ output_left
,
1017 OPUS_MAX_OUT
- output_left
, 0);
1019 output_left
= min(output_left
+ c
, static_cast<unsigned>(OPUS_MAX_OUT
));
1021 //Bad packet, insert silence.
1022 for(unsigned i
= 0; i
< plen
; i
++)
1023 output
[output_left
++] = 0;
1025 //Throw the pregap away if needed.
1026 if(pregap_thrown
< stream
.get_pregap()) {
1027 uint32_t throw_amt
= min(stream
.get_pregap() - pregap_thrown
, (uint32_t)output_left
);
1028 if(throw_amt
&& throw_amt
< output_left
)
1029 memmove(output
, output
+ throw_amt
, (output_left
- throw_amt
) * sizeof(float));
1030 output_left
-= throw_amt
;
1031 pregap_thrown
+= throw_amt
;
1036 void opus_playback_stream::read(float* data
, size_t samples
)
1038 float lgain
= stream
.get_gain_linear();
1039 while(samples
> 0) {
1041 if(next_block
>= blocks
&& !postgap_thrown
) {
1042 //This is the final packet. Throw away postgap samples at the end.
1043 uint32_t thrown
= min(stream
.get_postgap(), (uint32_t)output_left
);
1044 output_left
-= thrown
;
1045 postgap_thrown
= true;
1047 if(next_block
>= blocks
&& !output_left
) {
1048 //Zerofill remainder.
1049 for(size_t i
= 0; i
< samples
; i
++)
1053 unsigned maxcopy
= min(static_cast<unsigned>(samples
), output_left
);
1055 memcpy(data
, output
, maxcopy
* sizeof(float));
1056 for(size_t i
= 0; i
< maxcopy
; i
++)
1059 if(maxcopy
< output_left
&& maxcopy
)
1060 memmove(output
, output
+ maxcopy
, (output_left
- maxcopy
) * sizeof(float));
1061 output_left
-= maxcopy
;
1067 void opus_playback_stream::skip(uint64_t samples
)
1069 //Adjust for preskip and declare all preskip already thrown away.
1070 pregap_thrown
= stream
.get_pregap();
1071 samples
+= pregap_thrown
;
1072 postgap_thrown
= false;
1073 //First, skip inside decoded samples.
1074 if(samples
< output_left
) {
1075 //Skipping less than amount in output buffer. Just discard from output buffer and try
1076 //to decode a new block.
1077 memmove(output
, output
+ samples
, (output_left
- samples
) * sizeof(float));
1078 output_left
-= samples
;
1082 //Skipping at least the amount of samples in output buffer. First, blank the output buffer
1083 //and count those towards samples discarded.
1084 samples
-= output_left
;
1087 //While number of samples is so great that adequate convergence period can be ensured without
1088 //decoding this packet, just skip the samples from the packet.
1089 while(samples
> OPUS_CONVERGE_MAX
) {
1090 samples
-= stream
.packet_length(next_block
++);
1092 if(next_block
>= blocks
)
1095 //Okay, we are near the point. Start decoding packets.
1096 while(samples
> 0) {
1099 if(next_block
>= blocks
&& !output_left
)
1101 //Skip as many samples as possible.
1102 unsigned maxskip
= min(static_cast<unsigned>(samples
), output_left
);
1103 if(maxskip
< output_left
)
1104 memmove(output
, output
+ maxskip
, (output_left
- maxskip
) * sizeof(float));
1105 output_left
-= maxskip
;
1108 //Just to be nice, decode a extra block.
1113 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1114 //Collection of streams.
1115 struct stream_collection
1118 //Create a new collection.
1120 stream_collection(filesystem_ref filesys
);
1121 //Destroy a collection. All streams are destroyed but not deleted.
1122 ~stream_collection();
1123 //Get list of streams active at given point.
1124 std::list
<uint64_t> streams_at(uint64_t point
);
1125 //Add a stream into collection.
1127 uint64_t add_stream(opus_stream
& stream
);
1128 //Get the filesystem this collection is for.
1129 filesystem_ref
get_filesystem() { return fs
; }
1130 //Unlock all streams in collection.
1132 //Get stream with given index (NULL if not found).
1133 opus_stream
* get_stream(uint64_t index
)
1135 umutex_class
m(mutex
);
1136 if(streams
.count(index
)) {
1137 streams
[index
]->get_ref();
1138 return streams
[index
];
1144 void delete_stream(uint64_t index
);
1145 //Alter stream timebase.
1147 void alter_stream_timebase(uint64_t index
, uint64_t newts
);
1148 //Enumerate all valid stream indices, in time order.
1149 std::list
<uint64_t> all_streams();
1150 //Export the entiere superstream.
1152 void export_superstream(std::ofstream
& out
);
1155 uint64_t next_index
;
1156 unsigned next_stream
;
1158 std::set
<uint64_t> free_indices
;
1159 std::map
<uint64_t, uint64_t> entries
;
1160 std::multimap
<uint64_t, uint64_t> streams_by_time
;
1161 //FIXME: Something more efficient.
1162 std::map
<uint64_t, opus_stream
*> streams
;
1165 stream_collection::stream_collection(filesystem_ref filesys
)
1170 //The stream index table is in cluster 2.
1171 uint32_t next_cluster
= 2;
1172 uint32_t next_offset
= 0;
1177 size_t r
= fs
.read_data(next_cluster
, next_offset
, buffer
, 16);
1180 uint64_t timebase
= read64ube(buffer
);
1181 uint32_t ctrl_cluster
= read32ube(buffer
+ 8);
1182 uint32_t data_cluster
= read32ube(buffer
+ 12);
1184 opus_stream
* x
= new opus_stream(timebase
, fs
, ctrl_cluster
, data_cluster
);
1185 entries
[next_index
] = i
;
1186 streams_by_time
.insert(std::make_pair(timebase
, next_index
));
1187 streams
[next_index
++] = x
;
1189 free_indices
.insert(i
);
1192 } catch(std::exception
& e
) {
1193 for(auto i
: streams
)
1194 i
.second
->put_ref();
1195 (stringfmt() << "Failed to parse LSVS: " << e
.what()).throwex();
1199 stream_collection::~stream_collection()
1201 umutex_class
m(mutex
);
1202 for(auto i
: streams
)
1203 i
.second
->put_ref();
1207 std::list
<uint64_t> stream_collection::streams_at(uint64_t point
)
1209 umutex_class
m(mutex
);
1210 std::list
<uint64_t> s
;
1211 for(auto i
: streams
) {
1212 uint64_t start
= i
.second
->timebase();
1213 uint64_t end
= start
+ i
.second
->length();
1214 if(point
>= start
&& point
< end
) {
1215 i
.second
->get_ref();
1216 s
.push_back(i
.first
);
1222 uint64_t stream_collection::add_stream(opus_stream
& stream
)
1225 umutex_class
m(mutex
);
1226 //Lock the added stream so it doesn't start playing back immediately.
1228 uint64_t idx
= next_index
++;
1229 streams
[idx
] = &stream
;
1231 write64ube(buffer
, stream
.timebase());
1232 auto r
= stream
.get_clusters();
1233 write32ube(buffer
+ 8, r
.first
);
1234 write32ube(buffer
+ 12, r
.second
);
1235 uint64_t entry_number
= 0;
1236 if(free_indices
.empty())
1237 entry_number
= next_stream
++;
1239 entry_number
= *free_indices
.begin();
1240 free_indices
.erase(entry_number
);
1242 uint32_t write_cluster
= 2;
1243 uint32_t write_offset
= 0;
1244 uint32_t dummy1
, dummy2
;
1245 fs
.skip_data(write_cluster
, write_offset
, 16 * entry_number
);
1246 fs
.write_data(write_cluster
, write_offset
, buffer
, 16, dummy1
, dummy2
);
1247 streams_by_time
.insert(std::make_pair(stream
.timebase(), idx
));
1248 entries
[idx
] = entry_number
;
1250 } catch(std::exception
& e
) {
1251 (stringfmt() << "Failed to add stream: " << e
.what()).throwex();
1255 void stream_collection::unlock_all()
1257 umutex_class
m(mutex
);
1258 for(auto i
: streams
)
1262 void stream_collection::delete_stream(uint64_t index
)
1264 umutex_class
m(mutex
);
1265 if(!entries
.count(index
))
1267 uint64_t entry_number
= entries
[index
];
1268 uint32_t write_cluster
= 2;
1269 uint32_t write_offset
= 0;
1270 uint32_t dummy1
, dummy2
;
1271 char buffer
[16] = {0};
1272 fs
.skip_data(write_cluster
, write_offset
, 16 * entry_number
);
1273 fs
.write_data(write_cluster
, write_offset
, buffer
, 16, dummy1
, dummy2
);
1274 auto itr
= streams_by_time
.lower_bound(streams
[index
]->timebase());
1275 auto itr2
= streams_by_time
.upper_bound(streams
[index
]->timebase());
1276 for(auto x
= itr
; x
!= itr2
; x
++)
1277 if(x
->second
== index
) {
1278 streams_by_time
.erase(x
);
1281 streams
[index
]->delete_stream();
1282 streams
.erase(index
);
1285 void stream_collection::alter_stream_timebase(uint64_t index
, uint64_t newts
)
1288 umutex_class
m(mutex
);
1289 if(!streams
.count(index
))
1291 if(entries
.count(index
)) {
1293 uint32_t write_cluster
= 2;
1294 uint32_t write_offset
= 0;
1295 uint32_t dummy1
, dummy2
;
1296 write64ube(buffer
, newts
);
1297 fs
.skip_data(write_cluster
, write_offset
, 16 * entries
[index
]);
1298 fs
.write_data(write_cluster
, write_offset
, buffer
, 8, dummy1
, dummy2
);
1300 auto itr
= streams_by_time
.lower_bound(streams
[index
]->timebase());
1301 auto itr2
= streams_by_time
.upper_bound(streams
[index
]->timebase());
1302 for(auto x
= itr
; x
!= itr2
; x
++)
1303 if(x
->second
== index
) {
1304 streams_by_time
.erase(x
);
1307 streams
[index
]->timebase(newts
);
1308 streams_by_time
.insert(std::make_pair(newts
, index
));
1309 } catch(std::exception
& e
) {
1310 (stringfmt() << "Failed to alter stream timebase: " << e
.what()).throwex();
1314 std::list
<uint64_t> stream_collection::all_streams()
1316 umutex_class
m(mutex
);
1317 std::list
<uint64_t> s
;
1318 for(auto i
: streams_by_time
)
1319 s
.push_back(i
.second
);
1323 void stream_collection::export_superstream(std::ofstream
& out
)
1325 std::list
<uint64_t> slist
= all_streams();
1326 //Find the total length of superstream.
1328 for(auto i
: slist
) {
1329 opus_stream
* s
= get_stream(i
);
1331 len
= max(len
, s
->timebase() + s
->length());
1336 write64ule(header
, 0x1C586F532EULL
); //Magic and header size.
1337 write64ule(header
+ 8, len
);
1338 write64ule(header
+ 16, 4676829883349860352ULL); //Sampling rate.
1339 write64ule(header
+ 24, 1);
1340 out
.write(header
, 32);
1342 throw std::runtime_error("Error writing PCM output");
1344 //Find the first valid stream.
1345 auto next_i
= slist
.begin();
1346 opus_stream
* next_stream
= NULL
;
1347 while(next_i
!= slist
.end()) {
1348 next_stream
= get_stream(*next_i
);
1354 next_ts
= next_stream
? next_stream
->timebase() : len
;
1356 std::list
<opus_playback_stream
*> active
;
1358 for(uint64_t s
= 0; s
< len
;) {
1360 active
.push_back(new opus_playback_stream(*next_stream
));
1361 next_stream
->put_ref();
1363 while(next_i
!= slist
.end()) {
1364 next_stream
= get_stream(*next_i
);
1368 uint64_t next_ts
= next_stream
->timebase();
1371 //Okay, this starts too...
1372 active
.push_back(new opus_playback_stream(*next_stream
));
1373 next_stream
->put_ref();
1376 next_ts
= next_stream
? next_stream
->timebase() : len
;
1378 uint64_t maxsamples
= min(next_ts
- s
, static_cast<uint64_t>(OUTPUT_BLOCK
));
1379 maxsamples
= min(maxsamples
, len
- s
);
1380 char outbuf
[4 * OUTPUT_BLOCK
];
1381 float buf1
[OUTPUT_BLOCK
];
1382 float buf2
[OUTPUT_BLOCK
];
1383 for(size_t t
= 0; t
< maxsamples
; t
++)
1385 for(auto t
: active
) {
1386 t
->read(buf2
, maxsamples
);
1387 for(size_t u
= 0; u
< maxsamples
; u
++)
1390 for(auto t
= active
.begin(); t
!= active
.end();) {
1395 active
.erase(todel
);
1399 for(size_t t
= 0; t
< maxsamples
; t
++)
1400 write32sle(outbuf
+ 4 * t
, buf1
[t
] * 268435456);
1401 out
.write(outbuf
, 4 * maxsamples
);
1403 throw std::runtime_error("Failed to write PCM");
1406 } catch(std::exception
& e
) {
1407 (stringfmt() << "Failed to export PCM: " << e
.what()).throwex();
1409 for(auto t
= active
.begin(); t
!= active
.end();) {
1414 active
.erase(todelete
);
1420 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1421 void start_management_stream(opus_stream
& s
)
1423 opus_playback_stream
* p
= new opus_playback_stream(s
);
1424 umutex_class
m(active_playback_streams_lock
);
1425 active_playback_streams
.push_back(p
);
1428 void advance_time(uint64_t newtime
)
1430 umutex_class
m2(current_collection_lock
);
1431 if(!current_collection
) {
1433 umutex_class
m(active_playback_streams_lock
);
1434 for(auto i
: active_playback_streams
)
1436 active_playback_streams
.clear();
1439 std::list
<uint64_t> sactive
= current_collection
->streams_at(newtime
);
1440 for(auto j
: sactive
) {
1441 opus_stream
* i
= current_collection
->get_stream(j
);
1444 //Don't play locked streams in order to avoid double playing.
1445 umutex_class
m(active_playback_streams_lock
);
1448 active_playback_streams
.push_back(new opus_playback_stream(*i
));
1449 } catch(std::exception
& e
) {
1450 messages
<< "Can't start stream: " << e
.what() << std::endl
;
1456 void jump_time(uint64_t newtime
)
1458 umutex_class
m2(current_collection_lock
);
1459 if(!current_collection
) {
1461 umutex_class
m(active_playback_streams_lock
);
1462 for(auto i
: active_playback_streams
)
1464 active_playback_streams
.clear();
1467 //Close all currently playing streams.
1469 umutex_class
m(active_playback_streams_lock
);
1470 for(auto i
: active_playback_streams
)
1472 active_playback_streams
.clear();
1474 //Unlock all streams, so they will play.
1475 current_collection
->unlock_all();
1476 //Reopen all streams that should be open (with seeking)
1477 std::list
<uint64_t> sactive
= current_collection
->streams_at(newtime
);
1478 for(auto j
: sactive
) {
1479 opus_stream
* i
= current_collection
->get_stream(j
);
1482 //No need to check for locks, because we just busted all of those.
1483 uint64_t p
= newtime
- i
->timebase();
1484 opus_playback_stream
* s
;
1486 s
= new opus_playback_stream(*i
);
1487 } catch(std::exception
& e
) {
1488 messages
<< "Can't start stream: " << e
.what() << std::endl
;
1494 umutex_class
m(active_playback_streams_lock
);
1495 active_playback_streams
.push_back(s
);
1500 void do_resample(audioapi_resampler
& r
, float* srcbuf
, size_t& srcuse
, float* dstbuf
, size_t& dstuse
,
1501 size_t dstmax
, double ratio
)
1503 if(srcuse
== 0 || dstuse
>= dstmax
)
1506 size_t in_u
= srcuse
;
1507 float* out
= dstbuf
+ dstuse
;
1508 size_t out_u
= dstmax
- dstuse
;
1509 r
.resample(in
, in_u
, out
, out_u
, ratio
, false);
1510 size_t offset
= in
- srcbuf
;
1512 memmove(srcbuf
, srcbuf
+ offset
, sizeof(float) * (srcuse
- offset
));
1514 dstuse
= dstmax
- out_u
;
1517 //Drain the input buffer.
1520 while(audioapi_voice_r_status() > 0) {
1522 unsigned size
= min(audioapi_voice_r_status(), 256u);
1523 audioapi_record_voice(buf
, size
);
1527 //Read the input buffer.
1528 void read_input(float* buf
, size_t& use
, size_t maxuse
)
1530 size_t rleft
= audioapi_voice_r_status();
1531 unsigned toread
= min(rleft
, max(maxuse
, use
) - use
);
1533 audioapi_record_voice(buf
+ use
, toread
);
1538 //Compress Opus block.
1539 void compress_opus_block(OpusEncoder
* e
, float* buf
, size_t& use
, opus_stream
& active_stream
,
1540 double& total_compressed
, double& total_blocks
)
1542 const size_t opus_out_max
= 1276;
1543 unsigned char opus_output
[opus_out_max
];
1554 return; //No valid data to compress.
1556 int c
= opus_encode_float(e
, buf
, cblock
, opus_output
, opus_out_max
);
1558 //Successfully compressed a block.
1559 size_t opus_output_len
= c
;
1560 total_compressed
+= c
;
1563 active_stream
.write(cblock
/ 120, opus_output
, opus_output_len
);
1564 } catch(std::exception
& e
) {
1565 messages
<< "Error writing data: " << e
.what() << std::endl
;
1568 messages
<< "Error from Opus encoder: " << opus_strerror(c
) << std::endl
;
1574 uint64_t sampletime
;
1577 umutex_class
m(time_mutex
);
1578 sampletime
= current_time
;
1579 jumping
= time_jump
;
1583 jump_time(sampletime
);
1585 advance_time(sampletime
);
1588 void decompress_active_streams(float* out
, size_t& use
)
1591 use
+= OUTPUT_BLOCK
;
1592 for(unsigned i
= 0; i
< OUTPUT_BLOCK
; i
++)
1594 //Do it this way to minimize the amount of time playback streams lock
1596 std::list
<opus_playback_stream
*> stmp
;
1598 umutex_class
m(active_playback_streams_lock
);
1599 stmp
= active_playback_streams
;
1601 std::set
<opus_playback_stream
*> toerase
;
1602 for(auto i
: stmp
) {
1603 float tmp
[OUTPUT_BLOCK
];
1605 i
->read(tmp
, OUTPUT_BLOCK
);
1606 } catch(std::exception
& e
) {
1607 messages
<< "Failed to decompress: " << e
.what() << std::endl
;
1608 for(unsigned j
= 0; j
< OUTPUT_BLOCK
; j
++)
1611 for(unsigned j
= 0; j
< OUTPUT_BLOCK
; j
++)
1612 out
[j
+ base
] += tmp
[j
];
1617 umutex_class
m(active_playback_streams_lock
);
1618 for(auto i
= active_playback_streams
.begin(); i
!= active_playback_streams
.end();) {
1619 if(toerase
.count(*i
)) {
1623 active_playback_streams
.erase(toerase
);
1630 void handle_tangent_positive_edge(OpusEncoder
* e
, opus_stream
*& active_stream
,
1631 double& total_compressed
, double& total_blocks
)
1633 umutex_class
m2(current_collection_lock
);
1634 if(!current_collection
)
1636 static unsigned output_seq
= 0;
1637 opus_encoder_ctl(e
, OPUS_RESET_STATE
);
1638 total_compressed
= 0;
1642 umutex_class
m(time_mutex
);
1643 ctime
= current_time
;
1645 active_stream
= NULL
;
1647 active_stream
= new opus_stream(ctime
, current_collection
->get_filesystem());
1649 opus_encoder_ctl(e
, OPUS_GET_LOOKAHEAD(&pregap
));
1650 active_stream
->set_pregap(pregap
);
1651 } catch(std::exception
& e
) {
1652 messages
<< "Can't start stream: " << e
.what() << std::endl
;
1655 messages
<< "Tangent positive edge." << std::endl
;
1658 void handle_tangent_negative_edge(opus_stream
*& active_stream
, double total_compressed
,
1659 double total_blocks
)
1661 umutex_class
m2(current_collection_lock
);
1662 messages
<< "Tangent negative edge. "
1663 << total_compressed
<< " bytes in " << total_blocks
<< " blocks, "
1664 << (0.4 * total_compressed
/ total_blocks
) << " kbps" << std::endl
;
1665 active_stream
->write_trailier();
1666 if(current_collection
) {
1668 current_collection
->add_stream(*active_stream
);
1669 } catch(std::exception
& e
) {
1670 messages
<< "Can't add stream: " << e
.what() << std::endl
;
1671 active_stream
->put_ref();
1673 information_dispatch::do_voice_stream_change();
1675 active_stream
->put_ref();
1676 active_stream
= NULL
;
1679 class inthread_th
: public worker_thread
1699 } catch(std::bad_alloc
& e
) {
1701 } catch(std::exception
& e
) {
1702 messages
<< "AIEEE... Fatal exception in voice thread: " << e
.what() << std::endl
;
1708 const size_t f
= sizeof(float);
1709 double position
= 0;
1711 OpusEncoder
* oenc
= opus_encoder_create(OPUS_SAMPLERATE
, 1, OPUS_APPLICATION_VOIP
, &err
);
1712 opus_encoder_ctl(oenc
, OPUS_SET_BITRATE(OPUS_BITRATE
));
1713 audioapi_resampler rin
;
1714 audioapi_resampler rout
;
1715 const unsigned buf_max
= 6144; //These buffers better be large.
1716 size_t buf_in_use
= 0;
1717 size_t buf_inr_use
= 0;
1718 size_t buf_outr_use
= 0;
1719 size_t buf_out_use
= 0;
1720 float buf_in
[buf_max
];
1721 float buf_inr
[OPUS_BLOCK_SIZE
];
1722 float buf_outr
[OUTPUT_SIZE
];
1723 float buf_out
[buf_max
];
1724 double total_compressed
= 0;
1725 double total_blocks
= 0;
1726 opus_stream
* active_stream
= NULL
;
1730 uint64_t ticks
= get_utime();
1731 //Handle tangent edgets.
1732 if(active_flag
&& !active_stream
) {
1736 handle_tangent_positive_edge(oenc
, active_stream
, total_compressed
,
1739 else if((!active_flag
|| quit
) && active_stream
)
1740 handle_tangent_negative_edge(active_stream
, total_compressed
, total_blocks
);
1744 //Read input, up to 25ms.
1745 unsigned rate
= audioapi_voice_rate();
1746 size_t dbuf_max
= min(buf_max
, rate
/ REC_THRESHOLD_DIV
);
1747 read_input(buf_in
, buf_in_use
, dbuf_max
);
1749 //Resample up to full opus block.
1750 do_resample(rin
, buf_in
, buf_in_use
, buf_inr
, buf_inr_use
, OPUS_BLOCK_SIZE
,
1751 1.0 * OPUS_SAMPLERATE
/ rate
);
1753 //If we have full opus block and recording is enabled, compress it.
1754 if(buf_inr_use
>= OPUS_BLOCK_SIZE
&& active_stream
)
1755 compress_opus_block(oenc
, buf_inr
, buf_inr_use
, *active_stream
,
1756 total_compressed
, total_blocks
);
1758 //Update time, starting/ending streams.
1761 //Decompress active streams.
1762 if(buf_outr_use
< BLOCK_THRESHOLD
)
1763 decompress_active_streams(buf_outr
, buf_outr_use
);
1765 //Resample to output rate.
1766 do_resample(rout
, buf_outr
, buf_outr_use
, buf_out
, buf_out_use
, buf_max
,
1767 1.0 * rate
/ OPUS_SAMPLERATE
);
1770 if(buf_out_use
> 0 && audioapi_voice_p_status2() < rate
/ PLAY_THRESHOLD_DIV
) {
1771 audioapi_play_voice(buf_out
, buf_out_use
);
1775 //Sleep a bit to save CPU use.
1776 uint64_t ticks_spent
= get_utime() - ticks
;
1777 if(ticks_spent
< ITERATION_TIME
)
1778 usleep(ITERATION_TIME
- ticks_spent
);
1780 opus_encoder_destroy(oenc
);
1781 delete current_collection
;
1787 volatile bool quit_ack
;
1790 //The tangent function.
1791 function_ptr_command
<> ptangent("+tangent", "Voice tangent",
1792 "Syntax: +tangent\nVoice tangent.\n",
1793 []() throw(std::bad_alloc
, std::runtime_error
) {
1796 function_ptr_command
<> ntangent("-tangent", "Voice tangent",
1797 "Syntax: -tangent\nVoice tangent.\n",
1798 []() throw(std::bad_alloc
, std::runtime_error
) {
1799 active_flag
= false;
1802 inthread_th
* int_task
;
1806 void voice_frame_number(uint64_t newframe
, double rate
)
1808 if(rate
== last_rate
&& last_frame_number
== newframe
)
1810 umutex_class
m(time_mutex
);
1811 current_time
= newframe
/ rate
* OPUS_SAMPLERATE
;
1812 if(fabs(rate
- last_rate
) > 1e-6 || last_frame_number
+ 1 != newframe
)
1814 last_frame_number
= newframe
;
1818 void voicethread_task()
1820 int_task
= new inthread_th
;
1823 void voicethread_kill()
1829 uint64_t voicesub_parse_timebase(const std::string
& n
)
1832 if(x
.length() > 0 && x
[x
.length() - 1] == 's') {
1833 x
= x
.substr(0, x
.length() - 1);
1834 return 48000 * parse_value
<double>(x
);
1836 return parse_value
<uint64_t>(x
);
1841 function_ptr_command
<> list_streams("list-streams", "List streams ", "list-streams\nList known voice streams",
1842 []() throw(std::bad_alloc
, std::runtime_error
) {
1843 umutex_class
m2(current_collection_lock
);
1844 if(!current_collection
) {
1845 messages
<< "No voice streams loaded." << std::endl
;
1848 messages
<< "-----------------------" << std::endl
;
1849 for(auto i
: current_collection
->all_streams()) {
1850 opus_stream
* s
= current_collection
->get_stream(i
);
1853 messages
<< "ID #" << i
<< ": base=" << s
->timebase() << " ("
1854 << (s
->timebase() / 48000.0) << "s), length=" << s
->length() << " ("
1855 << (s
->length() / 48000.0) << "s)" << std::endl
;
1858 messages
<< "-----------------------" << std::endl
;
1861 function_ptr_command
<const std::string
&> delete_stream("delete-stream", "Delete a stream",
1862 "delete-stream <id>\nDelete a voice stream with given ID.",
1863 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
1864 umutex_class
m2(current_collection_lock
);
1865 uint64_t id
= parse_value
<uint64_t>(x
);
1866 if(!current_collection
) {
1867 messages
<< "No voice streams loaded." << std::endl
;
1870 opus_stream
* s
= current_collection
->get_stream(id
);
1872 messages
<< "Error, no such stream found." << std::endl
;
1876 current_collection
->delete_stream(id
);
1877 information_dispatch::do_voice_stream_change();
1878 messages
<< "Deleted stream #" << id
<< "." << std::endl
;
1881 function_ptr_command
<const std::string
&> play_stream("play-stream", "Play a stream", "play-stream <id>\n"
1882 "Play a voice stream with given ID.",
1883 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
1884 umutex_class
m2(current_collection_lock
);
1885 uint64_t id
= parse_value
<uint64_t>(x
);
1886 if(!current_collection
) {
1887 messages
<< "No voice streams loaded." << std::endl
;
1890 opus_stream
* s
= current_collection
->get_stream(id
);
1892 messages
<< "Error, no such stream found." << std::endl
;
1896 start_management_stream(*s
);
1902 messages
<< "Playing stream #" << id
<< "." << std::endl
;
1905 function_ptr_command
<const std::string
&> change_timebase("change-timebase", "Change stream timebase",
1906 "change-timebase <id> <newbase>\nChange timebase of given stream",
1907 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
1908 umutex_class
m2(current_collection_lock
);
1909 if(!current_collection
) {
1910 messages
<< "No voice streams loaded." << std::endl
;
1913 auto r
= regex("([0-9]+)[ \t]+([^ \t]*)", x
);
1915 messages
<< "Syntax: change-timebase <id> <timebase>" << std::endl
;
1918 uint64_t id
= parse_value
<uint64_t>(r
[1]);
1919 uint64_t tbase
= voicesub_parse_timebase(r
[2]);
1920 opus_stream
* s
= current_collection
->get_stream(id
);
1922 messages
<< "Error, no such stream found." << std::endl
;
1926 current_collection
->alter_stream_timebase(id
, tbase
);
1927 information_dispatch::do_voice_stream_change();
1928 messages
<< "Timebase of stream #" << id
<< " is now " << (tbase
/ 48000.0) << "s"
1932 void import_cmd_common(const std::string
& x
, const char* postfix
, external_stream_format mode
)
1934 umutex_class
m2(current_collection_lock
);
1935 if(!current_collection
) {
1936 messages
<< "No voice streams loaded." << std::endl
;
1939 auto r
= regex("([^ \t]+)[ \t]+(.+)", x
);
1941 messages
<< "Syntax: import-stream-" << postfix
<< " <timebase> <filename>" << std::endl
;
1944 uint64_t tbase
= voicesub_parse_timebase(r
[1]);
1945 std::string fname
= r
[2];
1946 std::ifstream
s(fname
, std::ios_base::in
| std::ios_base::binary
);
1948 messages
<< "Can't open '" << fname
<< "'" << std::endl
;
1951 opus_stream
* st
= new opus_stream(tbase
, current_collection
->get_filesystem(), s
, mode
);
1954 id
= current_collection
->add_stream(*st
);
1956 st
->delete_stream();
1959 st
->unlock(); //Not locked.
1960 information_dispatch::do_voice_stream_change();
1961 messages
<< "Imported stream (" << st
->length() / 48000.0 << "s) as ID #" << id
<< std::endl
;
1964 function_ptr_command
<const std::string
&> import_stream_c("import-stream-opus", "Import a opus stream",
1965 "import-stream-opus <timebase> <filename>\nImport opus stream from <filename>, starting at "
1967 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
1968 import_cmd_common(x
, "opus", EXTFMT_OPUSDEMO
);
1971 function_ptr_command
<const std::string
&> import_stream_p("import-stream-pcm", "Import a PCM stream",
1972 "import-stream-pcm <timebase> <filename>\nImport PCM stream from <filename>, starting at <timebase>",
1973 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
1974 import_cmd_common(x
, "pcm", EXTFMT_SOX
);
1977 function_ptr_command
<const std::string
&> import_stream_o("import-stream-ogg", "Import a OggOpus stream",
1978 "import-stream-ogg <timebase> <filename>\nImport OggOpus stream from <filename>, starting at "
1980 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
1981 import_cmd_common(x
, "ogg", EXTFMT_OGGOPUS
);
1984 void export_cmd_common(const std::string
& x
, const char* postfix
, external_stream_format mode
)
1986 umutex_class
m2(current_collection_lock
);
1987 if(!current_collection
) {
1988 messages
<< "No voice streams loaded." << std::endl
;
1991 auto r
= regex("([0-9]+)[ \t]+(.+)", x
);
1993 messages
<< "Syntax: export-stream-" << postfix
<< " <id> <filename>" << std::endl
;
1996 uint64_t id
= parse_value
<uint64_t>(r
[1]);
1997 std::string fname
= r
[2];
1998 std::ofstream
s(fname
, std::ios_base::out
| std::ios_base::binary
);
2000 messages
<< "Can't open '" << fname
<< "'" << std::endl
;
2003 opus_stream
* st
= current_collection
->get_stream(id
);
2005 messages
<< "Error, stream #" << id
<< " does not exist." << std::endl
;
2009 st
->export_stream(s
, mode
);
2010 messages
<< "Exported stream #" << id
<< " (" << st
->length() / 48000.0 << "s)" << std::endl
;
2011 } catch(std::exception
& e
) {
2012 messages
<< "Export failed: " << e
.what();
2017 function_ptr_command
<const std::string
&> export_stream_c("export-stream-opus", "Export a opus stream",
2018 "export-stream-opus <id> <filename>\nExport opus stream <id> to <filename>",
2019 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
2020 export_cmd_common(x
, "opus", EXTFMT_OPUSDEMO
);
2023 function_ptr_command
<const std::string
&> export_stream_p("export-stream-pcm", "Export a PCM stream",
2024 "export-stream-pcm <id> <filename>\nExport PCM stream <id> to <filename>",
2025 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
2026 export_cmd_common(x
, "pcm", EXTFMT_SOX
);
2029 function_ptr_command
<const std::string
&> export_stream_o("export-stream-ogg", "Export a OggOpus stream",
2030 "export-stream-ogg <id> <filename>\nExport OggOpus stream <id> to <filename>",
2031 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
2032 export_cmd_common(x
, "ogg", EXTFMT_OGGOPUS
);
2035 function_ptr_command
<const std::string
&> export_sstream("export-superstream", "Export superstream",
2036 "export-superstream <filename>\nExport PCM superstream to <filename>",
2037 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
2038 umutex_class
m2(current_collection_lock
);
2039 if(!current_collection
)
2041 std::ofstream
s(x
, std::ios_base::out
| std::ios_base::binary
);
2043 messages
<< "Can't open '" << x
<< "'" << std::endl
;
2046 current_collection
->export_superstream(s
);
2047 messages
<< "Superstream exported." << std::endl
;
2050 function_ptr_command
<const std::string
&> load_collection("load-collection", "Load voice subtitling "
2051 "collection", "load-collection <filename>\nLoad voice subtitling collection from <filename>",
2052 [](const std::string
& x
) throw(std::bad_alloc
, std::runtime_error
) {
2053 umutex_class
m2(current_collection_lock
);
2054 filesystem_ref newfs
;
2055 stream_collection
* newc
;
2057 newfs
= filesystem_ref(x
);
2058 newc
= new stream_collection(newfs
);
2059 } catch(std::exception
& e
) {
2060 messages
<< "Can't load '" << x
<< "': " << e
.what();
2063 if(current_collection
)
2064 delete current_collection
;
2065 current_collection
= newc
;
2066 information_dispatch::do_voice_stream_change();
2067 messages
<< "Loaded '" << x
<< "'" << std::endl
;
2070 function_ptr_command
<> unload_collection("unload-collection", "Unload voice subtitling collection",
2071 "unload-collection\nUnload voice subtitling collection",
2072 []() throw(std::bad_alloc
, std::runtime_error
) {
2073 umutex_class
m2(current_collection_lock
);
2074 if(current_collection
)
2075 delete current_collection
;
2076 current_collection
= NULL
;
2077 information_dispatch::do_voice_stream_change();
2078 messages
<< "Collection unloaded" << std::endl
;
2081 inverse_key
itangent("+tangent", "Movie‣Voice tangent");
2084 bool voicesub_collection_loaded()
2086 umutex_class
m2(current_collection_lock
);
2087 return (current_collection
!= NULL
);
2090 std::list
<playback_stream_info
> voicesub_get_stream_info()
2092 umutex_class
m2(current_collection_lock
);
2093 std::list
<playback_stream_info
> in
;
2094 if(!current_collection
)
2096 for(auto i
: current_collection
->all_streams()) {
2097 opus_stream
* s
= current_collection
->get_stream(i
);
2098 playback_stream_info pi
;
2102 pi
.base
= s
->timebase();
2103 pi
.length
= s
->length();
2113 void voicesub_play_stream(uint64_t id
)
2115 umutex_class
m2(current_collection_lock
);
2116 if(!current_collection
)
2117 throw std::runtime_error("No collection loaded");
2118 opus_stream
* s
= current_collection
->get_stream(id
);
2122 start_management_stream(*s
);
2130 void voicesub_export_stream(uint64_t id
, const std::string
& filename
, external_stream_format fmt
)
2132 umutex_class
m2(current_collection_lock
);
2133 if(!current_collection
)
2134 throw std::runtime_error("No collection loaded");
2135 opus_stream
* st
= current_collection
->get_stream(id
);
2138 std::ofstream
s(filename
, std::ios_base::out
| std::ios_base::binary
);
2141 throw std::runtime_error("Can't open output file");
2144 st
->export_stream(s
, fmt
);
2145 } catch(std::exception
& e
) {
2147 (stringfmt() << "Export failed: " << e
.what()).throwex();
2152 uint64_t voicesub_import_stream(uint64_t ts
, const std::string
& filename
, external_stream_format fmt
)
2154 umutex_class
m2(current_collection_lock
);
2155 if(!current_collection
)
2156 throw std::runtime_error("No collection loaded");
2158 std::ifstream
s(filename
, std::ios_base::in
| std::ios_base::binary
);
2160 throw std::runtime_error("Can't open input file");
2161 opus_stream
* st
= new opus_stream(ts
, current_collection
->get_filesystem(), s
, fmt
);
2164 id
= current_collection
->add_stream(*st
);
2166 st
->delete_stream();
2169 st
->unlock(); //Not locked.
2170 information_dispatch::do_voice_stream_change();
2174 void voicesub_delete_stream(uint64_t id
)
2176 umutex_class
m2(current_collection_lock
);
2177 if(!current_collection
)
2178 throw std::runtime_error("No collection loaded");
2179 current_collection
->delete_stream(id
);
2180 information_dispatch::do_voice_stream_change();
2183 void voicesub_export_superstream(const std::string
& filename
)
2185 umutex_class
m2(current_collection_lock
);
2186 if(!current_collection
)
2187 throw std::runtime_error("No collection loaded");
2188 std::ofstream
s(filename
, std::ios_base::out
| std::ios_base::binary
);
2190 throw std::runtime_error("Can't open output file");
2191 current_collection
->export_superstream(s
);
2194 void voicesub_load_collection(const std::string
& filename
)
2196 umutex_class
m2(current_collection_lock
);
2197 filesystem_ref newfs
;
2198 stream_collection
* newc
;
2199 newfs
= filesystem_ref(filename
);
2200 newc
= new stream_collection(newfs
);
2201 if(current_collection
)
2202 delete current_collection
;
2203 current_collection
= newc
;
2204 information_dispatch::do_voice_stream_change();
2207 void voicesub_unload_collection()
2209 umutex_class
m2(current_collection_lock
);
2210 if(current_collection
)
2211 delete current_collection
;
2212 current_collection
= NULL
;
2213 information_dispatch::do_voice_stream_change();
2216 void voicesub_alter_timebase(uint64_t id
, uint64_t ts
)
2218 umutex_class
m2(current_collection_lock
);
2219 if(!current_collection
)
2220 throw std::runtime_error("No collection loaded");
2221 current_collection
->alter_stream_timebase(id
, ts
);
2222 information_dispatch::do_voice_stream_change();
2225 double voicesub_ts_seconds(uint64_t ts
)
2227 return ts
/ 48000.0;
2230 void voicethread_task() {}
2231 void voice_frame_number(uint64_t newframe
, double rate
) {}
2232 void voicethread_kill() {}