Fix some memory leak complaints from Valgrind
[lsnes.git] / src / core / inthread.cpp
blob10b9e77ac5cf651950787ae7884f0858d94a721e
1 #include <lsnes.hpp>
2 #include <cstdint>
3 #include "library/filesystem.hpp"
4 #include "library/minmax.hpp"
5 #include "library/workthread.hpp"
6 #include "library/serialization.hpp"
7 #include "library/string.hpp"
8 #include "library/ogg.hpp"
9 #include "library/opus-ogg.hpp"
10 #include "library/opus.hpp"
11 #include "core/audioapi.hpp"
12 #include "core/command.hpp"
13 #include "core/dispatch.hpp"
14 #include "core/framerate.hpp"
15 #include "core/inthread.hpp"
16 #include "core/keymapper.hpp"
17 #include "core/settings.hpp"
18 #include "core/misc.hpp"
19 #include <cmath>
20 #include <list>
21 #include <iostream>
22 #include <fstream>
23 #include <cstring>
24 #include <unistd.h>
25 #include <sys/time.h>
26 #include <zlib.h>
28 //Farther than this, packets can be fastskipped.
29 #define OPUS_CONVERGE_MAX 5760
30 //Maximum size of PCM output for one packet.
31 #define OPUS_MAX_OUT 5760
32 //Output block size.
33 #define OUTPUT_BLOCK 1440
34 //Main sampling rate.
35 #define OPUS_SAMPLERATE 48000
36 //Opus block size
37 #define OPUS_BLOCK_SIZE 960
38 //Threshold for decoding additional block
39 #define BLOCK_THRESHOLD 1200
40 //Maximum output block size.
41 #define OUTPUT_SIZE (BLOCK_THRESHOLD + OUTPUT_BLOCK)
42 //Amount of microseconds per interation.
43 #define ITERATION_TIME 15000
44 //Opus bitrate to use.
45 #define OPUS_BITRATE 48000
46 //Opus min bitrate to use.
47 #define OPUS_MIN_BITRATE 8000
48 //Opus max bitrate to use.
49 #define OPUS_MAX_BITRATE 255000
50 //Ogg Opus granule rate.
51 #define OGGOPUS_GRANULERATE 48000
52 //Record buffer size threshold divider.
53 #define REC_THRESHOLD_DIV 40
54 //Playback buffer size threshold divider.
55 #define PLAY_THRESHOLD_DIV 30
56 //Special granule position: None.
57 #define GRANULEPOS_NONE 0xFFFFFFFFFFFFFFFFULL
59 namespace
61 class opus_playback_stream;
62 class opus_stream;
63 class stream_collection;
65 settingvar::variable<settingvar::model_int<OPUS_MIN_BITRATE,OPUS_MAX_BITRATE>> opus_bitrate(lsnes_vset,
66 "opus-bitrate", "commentary‣Bitrate", OPUS_BITRATE);
67 settingvar::variable<settingvar::model_int<OPUS_MIN_BITRATE,OPUS_MAX_BITRATE>> opus_max_bitrate(lsnes_vset,
68 "opus-max-bitrate", "commentary‣Max bitrate", OPUS_MAX_BITRATE);
70 //Recording active flag.
71 volatile bool active_flag = false;
72 //Last seen frame number.
73 uint64_t last_frame_number = 0;
74 //Last seen rate.
75 double last_rate = 0;
76 //Mutex protecting current_time and time_jump.
77 mutex_class time_mutex;
78 //The current time.
79 uint64_t current_time;
80 //Time jump flag. Set if time jump is detected.
81 //If time jump is detected, all current playing streams are stopped, stream locks are cleared and
82 //apropriate streams are restarted. If time jump is false, all unlocked streams coming into range
83 //are started.
84 bool time_jump;
85 //Lock protecting active_playback_streams.
86 mutex_class active_playback_streams_lock;
87 //List of streams currently playing.
88 std::list<opus_playback_stream*> active_playback_streams;
89 //The collection of streams.
90 stream_collection* current_collection;
91 //Lock protecting current collection.
92 mutex_class current_collection_lock;
94 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
95 //Bitrate tracker.
96 struct bitrate_tracker
98 bitrate_tracker() throw();
99 void reset() throw();
100 double get_min() throw();
101 double get_avg() throw();
102 double get_max() throw();
103 double get_length() throw();
104 uint64_t get_bytes() throw();
105 uint64_t get_blocks() throw();
106 void submit(uint32_t bytes, uint32_t samples) throw();
107 private:
108 uint64_t blocks;
109 uint64_t samples;
110 uint64_t bytes;
111 uint32_t minrate;
112 uint32_t maxrate;
115 bitrate_tracker::bitrate_tracker() throw()
117 reset();
120 void bitrate_tracker::reset() throw()
122 blocks = 0;
123 samples = 0;
124 bytes = 0;
125 minrate = std::numeric_limits<uint32_t>::max();
126 maxrate = 0;
129 double bitrate_tracker::get_min() throw()
131 return blocks ? minrate / 1000.0 : 0.0;
134 double bitrate_tracker::get_avg() throw()
136 return samples ? bytes / (125.0 * samples / OPUS_SAMPLERATE) : 0.0;
139 double bitrate_tracker::get_max() throw()
141 return blocks ? maxrate / 1000.0 : 0.0;
144 double bitrate_tracker::get_length() throw()
146 return 1.0 * samples / OPUS_SAMPLERATE;
149 uint64_t bitrate_tracker::get_bytes() throw()
151 return bytes;
154 uint64_t bitrate_tracker::get_blocks() throw()
156 return blocks;
159 void bitrate_tracker::submit(uint32_t _bytes, uint32_t _samples) throw()
161 blocks++;
162 samples += _samples;
163 bytes += _bytes;
164 uint32_t irate = _bytes * 8 * OPUS_SAMPLERATE / OPUS_BLOCK_SIZE;
165 minrate = min(minrate, irate);
166 maxrate = max(maxrate, irate);
169 std::ostream& operator<<(std::ostream& s, bitrate_tracker& t)
171 s << t.get_bytes() << " bytes for " << t.get_length() << "s (" << t.get_blocks() << " blocks)"
172 << std::endl << "Bitrate (kbps): min: " << t.get_min() << " avg: " << t.get_avg() << " max:"
173 << t.get_max() << std::endl;
174 return s;
177 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
178 //Information about individual opus packet in stream.
179 struct opus_packetinfo
181 //Length is in units of 1/400th of a second.
182 opus_packetinfo(uint16_t datasize, uint8_t length, uint64_t offset)
184 descriptor = (offset & 0xFFFFFFFFFFULL) | (static_cast<uint64_t>(length) << 40) |
185 (static_cast<uint64_t>(datasize) << 48);
187 //Get the data size of the packet.
188 uint16_t size() { return descriptor >> 48; }
189 //Calculate the length of packet in samples.
190 uint16_t length() { return 120 * ((descriptor >> 40) & 0xFF); }
191 //Calculate the true offset.
192 uint64_t offset() { return descriptor & 0xFFFFFFFFFFULL; }
193 //Read the packet.
194 //Can throw.
195 std::vector<unsigned char> packet(filesystem::ref from_sys);
196 private:
197 uint64_t descriptor;
200 std::vector<unsigned char> opus_packetinfo::packet(filesystem::ref from_sys)
202 std::vector<unsigned char> ret;
203 uint64_t off = offset();
204 uint32_t sz = size();
205 uint32_t cluster = off / CLUSTER_SIZE;
206 uint32_t coff = off % CLUSTER_SIZE;
207 ret.resize(sz);
208 size_t r = from_sys.read_data(cluster, coff, &ret[0], sz);
209 if(r != sz)
210 throw std::runtime_error("Incomplete read");
211 return ret;
214 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
215 //Information about opus stream.
216 struct opus_stream
218 //Create new empty stream with specified base time.
219 opus_stream(uint64_t base, filesystem::ref filesys);
220 //Read stream with specified base time and specified start clusters.
221 //Can throw.
222 opus_stream(uint64_t base, filesystem::ref filesys, uint32_t ctrl_cluster, uint32_t data_cluster);
223 //Import a stream with specified base time.
224 //Can throw.
225 opus_stream(uint64_t base, filesystem::ref filesys, std::ifstream& data,
226 external_stream_format extfmt);
227 //Delete this stream (also puts a ref)
228 void delete_stream() { deleting = true; put_ref(); }
229 //Export a stream.
230 //Can throw.
231 void export_stream(std::ofstream& data, external_stream_format extfmt);
232 //Get length of specified packet in samples.
233 uint16_t packet_length(uint32_t seqno)
235 return (seqno < packets.size()) ? packets[seqno].length() : 0;
237 //Get data of specified packet.
238 //Can throw.
239 std::vector<unsigned char> packet(uint32_t seqno)
241 return (seqno < packets.size()) ? packets[seqno].packet(fs) : std::vector<unsigned char>();
243 //Get base time in samples for stream.
244 uint64_t timebase() { return s_timebase; }
245 //Set base time in samples for stream.
246 void timebase(uint64_t ts) { s_timebase = ts; }
247 //Get length of stream in samples.
248 uint64_t length()
250 if(pregap_length + postgap_length > total_len)
251 return 0;
252 else
253 return total_len - pregap_length - postgap_length;
255 //Set the pregap length.
256 void set_pregap(uint32_t p) { pregap_length = p; }
257 //Get the pregap length.
258 uint32_t get_pregap() { return pregap_length; }
259 //Set the postgap length.
260 void set_potsgap(uint32_t p) { postgap_length = p; }
261 //Get the postgap length.
262 uint32_t get_postgap() { return postgap_length; }
263 //Set gain.
264 void set_gain(int16_t g) { gain = g; }
265 //Get gain.
266 int16_t get_gain() { return gain; }
267 //Get linear gain.
268 float get_gain_linear() { return pow(10, gain / 20); }
269 //Get number of packets in stream.
270 uint32_t blocks() { return packets.size(); }
271 //Is this stream locked?
272 bool islocked() { return locked; }
273 //Lock a stream.
274 void lock() { locked = true; }
275 //Unlock a stream.
276 void unlock() { locked = false; }
277 //Increment reference count.
278 void get_ref() { umutex_class m(reflock); refcount++; }
279 //Decrement reference count, destroying object if it hits zero.
280 void put_ref() { umutex_class m(reflock); refcount--; if(!refcount) destroy(); }
281 //Add new packet into stream.
282 //Not safe to call simultaneously with packet_length() or packet().
283 //Can throw.
284 void write(uint8_t len, const unsigned char* payload, size_t payload_len);
285 //Write stream trailer.
286 void write_trailier();
287 //Get clusters.
288 std::pair<uint32_t, uint32_t> get_clusters() { return std::make_pair(ctrl_cluster, data_cluster); }
289 private:
290 void export_stream_sox(std::ofstream& data);
291 void export_stream_oggopus(std::ofstream& data);
292 void import_stream_sox(std::ifstream& data);
293 void import_stream_oggopus(std::ifstream& data);
295 opus_stream(const opus_stream&);
296 opus_stream& operator=(const opus_stream&);
297 void destroy();
298 filesystem::ref fs;
299 std::vector<opus_packetinfo> packets;
300 uint64_t total_len;
301 uint64_t s_timebase;
302 uint32_t next_cluster;
303 uint32_t next_offset;
304 uint32_t next_mcluster;
305 uint32_t next_moffset;
306 uint32_t ctrl_cluster;
307 uint32_t data_cluster;
308 uint32_t pregap_length;
309 uint32_t postgap_length;
310 int16_t gain;
311 bool locked;
312 mutex_class reflock;
313 unsigned refcount;
314 bool deleting;
317 opus_stream::opus_stream(uint64_t base, filesystem::ref filesys)
318 : fs(filesys)
320 refcount = 1;
321 deleting = false;
322 total_len = 0;
323 s_timebase = base;
324 locked = false;
325 next_cluster = 0;
326 next_mcluster = 0;
327 next_offset = 0;
328 next_moffset = 0;
329 ctrl_cluster = 0;
330 data_cluster = 0;
331 pregap_length = 0;
332 postgap_length = 0;
333 gain = 0;
336 opus_stream::opus_stream(uint64_t base, filesystem::ref filesys, uint32_t _ctrl_cluster,
337 uint32_t _data_cluster)
338 : fs(filesys)
340 refcount = 1;
341 deleting = false;
342 total_len = 0;
343 s_timebase = base;
344 locked = false;
345 next_cluster = data_cluster = _data_cluster;
346 next_mcluster = ctrl_cluster = _ctrl_cluster;
347 next_offset = 0;
348 next_moffset = 0;
349 pregap_length = 0;
350 postgap_length = 0;
351 gain = 0;
352 //Read the data buffers.
353 char buf[CLUSTER_SIZE];
354 uint32_t last_cluster_seen = next_mcluster;
355 uint64_t total_size = 0;
356 uint64_t total_frames = 0;
357 bool trailers = false;
358 bool saved_pointer_valid = false;
359 uint32_t saved_next_mcluster = 0;
360 uint32_t saved_next_moffset = 0;
361 while(true) {
362 last_cluster_seen = next_mcluster;
363 size_t r = fs.read_data(next_mcluster, next_moffset, buf, CLUSTER_SIZE);
364 if(!r) {
365 //The stream ends here.
366 break;
368 //Find the first unused entry if any.
369 for(unsigned i = 0; i < CLUSTER_SIZE; i += 4)
370 if(!buf[i + 3] || trailers) {
371 //This entry is unused. If the next entry is also unused, that is the end.
372 //Otherwise, there might be stream trailers.
373 if(trailers && !buf[i + 3]) {
374 goto out_parsing; //Ends for real.
376 if(!trailers) {
377 //Set the trailer flag and continue parsing.
378 //The saved offset must be placed here.
379 saved_next_mcluster = last_cluster_seen;
380 saved_next_moffset = i;
381 saved_pointer_valid = true;
382 trailers = true;
383 continue;
385 //This is a trailer entry.
386 if(buf[i + 3] == 2) {
387 //Pregap.
388 pregap_length = serialization::u32b(buf + i) >> 8;
389 } else if(buf[i + 3] == 3) {
390 //Postgap.
391 postgap_length = serialization::u32b(buf + i) >> 8;
392 } else if(buf[i + 3] == 4) {
393 //Gain.
394 gain = serialization::s16b(buf + i);
396 } else {
397 uint16_t psize = serialization::u16b(buf + i);
398 uint8_t plen = serialization::u8b(buf + i + 2);
399 total_size += psize;
400 total_len += 120 * plen;
401 opus_packetinfo p(psize, plen, 1ULL * next_cluster * CLUSTER_SIZE +
402 next_offset);
403 size_t r2 = fs.skip_data(next_cluster, next_offset, psize);
404 if(r2 < psize)
405 throw std::runtime_error("Incomplete data stream");
406 packets.push_back(p);
407 total_frames++;
410 out_parsing:
411 //If saved pointer is valid, restore to that.
412 if(saved_pointer_valid) {
413 next_mcluster = saved_next_mcluster;
414 next_moffset = saved_next_moffset;
418 opus_stream::opus_stream(uint64_t base, filesystem::ref filesys, std::ifstream& data,
419 external_stream_format extfmt)
420 : fs(filesys)
422 refcount = 1;
423 deleting = false;
424 total_len = 0;
425 s_timebase = base;
426 locked = false;
427 next_cluster = 0;
428 next_mcluster = 0;
429 next_offset = 0;
430 next_moffset = 0;
431 ctrl_cluster = 0;
432 data_cluster = 0;
433 pregap_length = 0;
434 postgap_length = 0;
435 gain = 0;
436 if(extfmt == EXTFMT_OGGOPUS)
437 import_stream_oggopus(data);
438 else if(extfmt == EXTFMT_SOX)
439 import_stream_sox(data);
442 void opus_stream::import_stream_oggopus(std::ifstream& data)
444 ogg::stream_reader_iostreams reader(data);
445 reader.set_errors_to(messages);
446 struct opus::ogg_header h;
447 struct opus::ogg_tags t;
448 ogg::page page;
449 ogg::demuxer d(messages);
450 int state = 0;
451 postgap_length = 0;
452 uint64_t datalen = 0;
453 uint64_t last_datalen = 0;
454 uint64_t last_granulepos = 0;
455 try {
456 while(true) {
457 ogg::packet p;
458 if(!d.wants_packet_out()) {
459 if(!reader.get_page(page))
460 break;
461 d.page_in(page);
462 continue;
463 } else
464 d.packet_out(p);
465 switch(state) {
466 case 0: //Not locked.
467 h.parse(p);
468 if(h.streams != 1)
469 throw std::runtime_error("Multistream OggOpus streams are not "
470 "supported");
471 state = 1; //Expecting comment.
472 pregap_length = h.preskip;
473 gain = h.gain;
474 break;
475 case 1: //Expecting comment.
476 t.parse(p);
477 state = 2; //Data page.
478 if(page.get_eos())
479 throw std::runtime_error("Empty OggOpus stream");
480 break;
481 case 2: //Data page.
482 case 3: //Data page.
483 const std::vector<uint8_t>& pkt = p.get_vector();
484 uint8_t tcnt = opus::packet_tick_count(&pkt[0], pkt.size());
485 if(tcnt) {
486 write(tcnt, &pkt[0], pkt.size());
487 datalen += tcnt * 120;
489 if(p.get_last_page()) {
490 uint64_t samples = p.get_granulepos() - last_granulepos;
491 if(samples > p.get_granulepos())
492 samples = 0;
493 uint64_t rsamples = datalen - last_datalen;
494 if((samples > rsamples && state == 3) || (samples <
495 rsamples && !p.get_on_eos_page()))
496 messages << "Warning: Granulepos says there are "
497 << samples << " samples, found " << rsamples
498 << std::endl;
499 last_datalen = datalen;
500 last_granulepos = p.get_granulepos();
501 if(p.get_on_eos_page()) {
502 if(samples < rsamples)
503 postgap_length = rsamples - samples;
504 state = 4;
505 goto out;
508 state = 3;
509 break;
512 out:
513 if(state == 0)
514 throw std::runtime_error("No OggOpus stream found");
515 if(state == 1)
516 throw std::runtime_error("Oggopus stream missing required tags pages");
517 if(state == 2 || state == 3)
518 messages << "Warning: Incomplete Oggopus stream." << std::endl;
519 if(datalen <= pregap_length)
520 throw std::runtime_error("Stream too short (entiere pregap not present)");
521 write_trailier();
522 } catch(...) {
523 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
524 if(data_cluster) fs.free_cluster_chain(data_cluster);
525 throw;
529 void opus_stream::import_stream_sox(std::ifstream& data)
531 bitrate_tracker brtrack;
532 int err;
533 unsigned char tmpi[65536];
534 float tmp[OPUS_MAX_OUT];
535 char header[260];
536 data.read(header, 32);
537 if(!data)
538 throw std::runtime_error("Can't read .sox header");
539 if(serialization::u32l(header + 0) != 0x586F532EULL)
540 throw std::runtime_error("Bad .sox header magic");
541 if(serialization::u8b(header + 4) > 28)
542 data.read(header + 32, serialization::u8b(header + 4) - 28);
543 if(!data)
544 throw std::runtime_error("Can't read .sox header");
545 if(serialization::u64l(header + 16) != 4676829883349860352ULL)
546 throw std::runtime_error("Bad .sox sampling rate");
547 if(serialization::u32l(header + 24) != 1)
548 throw std::runtime_error("Only mono streams are supported");
549 uint64_t samples = serialization::u64l(header + 8);
550 opus::encoder enc(opus::samplerate::r48k, false, opus::application::voice);
551 enc.ctl(opus::bitrate(opus_bitrate.get()));
552 int32_t pregap = enc.ctl(opus::lookahead);
553 pregap_length = pregap;
554 for(uint64_t i = 0; i < samples + pregap; i += OPUS_BLOCK_SIZE) {
555 size_t bs = OPUS_BLOCK_SIZE;
556 if(i + bs > samples + pregap)
557 bs = samples + pregap - i;
558 //We have to read zero bytes after the end of stream.
559 size_t readable = bs;
560 if(readable + i > samples)
561 readable = max(samples, i) - i;
562 if(readable > 0)
563 data.read(reinterpret_cast<char*>(tmpi), 4 * readable);
564 if(readable < bs)
565 memset(tmpi + 4 * readable, 0, 4 * (bs - readable));
566 if(!data) {
567 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
568 if(data_cluster) fs.free_cluster_chain(data_cluster);
569 throw std::runtime_error("Can't read .sox data");
571 for(size_t j = 0; j < bs; j++)
572 tmp[j] = static_cast<float>(serialization::s32l(tmpi + 4 * j)) / 268435456;
573 if(bs < OPUS_BLOCK_SIZE)
574 postgap_length = OPUS_BLOCK_SIZE - bs;
575 for(size_t j = bs; j < OPUS_BLOCK_SIZE; j++)
576 tmp[j] = 0;
577 try {
578 const size_t opus_out_max2 = opus_max_bitrate.get() * OPUS_BLOCK_SIZE / 384000;
579 size_t r = enc.encode(tmp, OPUS_BLOCK_SIZE, tmpi, opus_out_max2);
580 write(OPUS_BLOCK_SIZE / 120, tmpi, r);
581 brtrack.submit(r, bs);
582 } catch(std::exception& e) {
583 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
584 if(data_cluster) fs.free_cluster_chain(data_cluster);
585 (stringfmt() << "Error encoding opus packet: " << e.what()).throwex();
588 messages << "Imported stream: " << brtrack;
589 try {
590 write_trailier();
591 } catch(...) {
592 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
593 if(data_cluster) fs.free_cluster_chain(data_cluster);
594 throw;
598 void opus_stream::destroy()
600 if(deleting) {
601 //We catch the errors and print em, because otherwise put_ref could throw, which would
602 //be too much.
603 try {
604 fs.free_cluster_chain(ctrl_cluster);
605 } catch(std::exception& e) {
606 messages << "Failed to delete stream control file: " << e.what();
608 try {
609 fs.free_cluster_chain(data_cluster);
610 } catch(std::exception& e) {
611 messages << "Failed to delete stream data file: " << e.what();
614 delete this;
617 void opus_stream::export_stream_oggopus(std::ofstream& data)
619 if(!packets.size())
620 throw std::runtime_error("Empty oggopus stream is not valid");
621 opus::ogg_header header;
622 opus::ogg_tags tags;
623 ogg::stream_writer_iostreams writer(data);
624 unsigned stream_id = 1;
625 uint64_t true_granule = 0;
626 uint32_t seq = 2;
627 //Headers / Tags.
628 header.version = 1;
629 header.channels = 1;
630 header.preskip = pregap_length;
631 header.rate = OPUS_SAMPLERATE;
632 header.gain = 0;
633 header.map_family = 0;
634 header.streams = 1;
635 header.coupled = 0;
636 header.chanmap[0] = 0;
637 memset(header.chanmap + 1, 255, 254);
638 tags.vendor = "unknown";
639 tags.comments.push_back((stringfmt() << "ENCODER=lsnes rr" + lsnes_version).str());
640 tags.comments.push_back((stringfmt() << "LSNES_STREAM_TS=" << s_timebase).str());
642 struct ogg::page hpage = header.serialize();
643 hpage.set_stream(stream_id);
644 writer.put_page(hpage);
645 seq = tags.serialize([&writer](const ogg::page& p) { writer.put_page(p); }, stream_id);
647 struct ogg::page ppage;
648 ogg::muxer mux(stream_id, seq);
649 for(size_t i = 0; i < packets.size(); i++) {
650 std::vector<unsigned char> p;
651 try {
652 p = packet(i);
653 } catch(std::exception& e) {
654 (stringfmt() << "Error reading opus packet: " << e.what()).throwex();
656 if(!p.size())
657 (stringfmt() << "Empty Opus packet is not valid").throwex();
658 uint32_t samples = static_cast<uint32_t>(opus::packet_tick_count(&p[0], p.size())) * 120;
659 if(i + 1 < packets.size())
660 true_granule += samples;
661 else
662 true_granule = max(true_granule, true_granule + samples - postgap_length);
663 if(!mux.wants_packet_in() || !mux.packet_fits(p.size()))
664 while(mux.has_page_out()) {
665 mux.page_out(ppage);
666 writer.put_page(ppage);
668 mux.packet_in(p, true_granule);
670 mux.signal_eos();
671 while(mux.has_page_out()) {
672 mux.page_out(ppage);
673 writer.put_page(ppage);
677 void opus_stream::export_stream_sox(std::ofstream& data)
679 int err;
680 opus::decoder dec(opus::samplerate::r48k, false);
681 std::vector<unsigned char> p;
682 float tmp[OPUS_MAX_OUT];
683 char header[32];
684 serialization::u64l(header, 0x1C586F532EULL); //Magic and header size.
685 serialization::u64l(header + 16, 4676829883349860352ULL); //Sampling rate.
686 serialization::u32l(header + 24, 1);
687 uint64_t tlen = 0;
688 uint32_t lookahead_thrown = 0;
689 data.write(header, 32);
690 if(!data)
691 throw std::runtime_error("Error writing PCM data.");
692 float lgain = get_gain_linear();
693 for(size_t i = 0; i < packets.size(); i++) {
694 char blank[4] = {0, 0, 0, 0};
695 try {
696 uint32_t pregap_throw = 0;
697 uint32_t postgap_throw = 0;
698 std::vector<unsigned char> p = packet(i);
699 uint32_t len = packet_length(i);
700 size_t r = dec.decode(&p[0], p.size(), tmp, OPUS_MAX_OUT);
701 bool is_last = (i == packets.size() - 1);
702 if(lookahead_thrown < pregap_length) {
703 //We haven't yet thrown the full pregap. Throw some.
704 uint32_t maxthrow = pregap_length - lookahead_thrown;
705 pregap_throw = min(len, maxthrow);
706 lookahead_thrown += pregap_length;
708 if(is_last)
709 postgap_throw = min(len - pregap_throw, postgap_length);
710 tlen += (len - pregap_throw - postgap_throw);
711 for(uint32_t j = pregap_throw; j < len - postgap_throw; j++) {
712 int32_t s = (int32_t)(tmp[j] * lgain * 268435456.0);
713 serialization::s32l(blank, s);
714 data.write(blank, 4);
715 if(!data)
716 throw std::runtime_error("Error writing PCM data.");
718 } catch(std::exception& e) {
719 (stringfmt() << "Error decoding opus packet: " << e.what()).throwex();
722 data.seekp(0, std::ios_base::beg);
723 serialization::u64l(header + 8, tlen);
724 data.write(header, 32);
725 if(!data) {
726 throw std::runtime_error("Error writing PCM data.");
730 void opus_stream::export_stream(std::ofstream& data, external_stream_format extfmt)
732 if(extfmt == EXTFMT_OGGOPUS)
733 export_stream_oggopus(data);
734 else if(extfmt == EXTFMT_SOX)
735 export_stream_sox(data);
738 void opus_stream::write(uint8_t len, const unsigned char* payload, size_t payload_len)
740 try {
741 char descriptor[4];
742 uint32_t used_cluster, used_offset;
743 uint32_t used_mcluster, used_moffset;
744 if(!next_cluster)
745 next_cluster = data_cluster = fs.allocate_cluster();
746 if(!next_mcluster)
747 next_mcluster = ctrl_cluster = fs.allocate_cluster();
748 serialization::u16b(descriptor, payload_len);
749 serialization::u8b(descriptor + 2, len);
750 serialization::u8b(descriptor + 3, 1);
751 fs.write_data(next_cluster, next_offset, payload, payload_len, used_cluster, used_offset);
752 fs.write_data(next_mcluster, next_moffset, descriptor, 4, used_mcluster, used_moffset);
753 uint64_t off = static_cast<uint64_t>(used_cluster) * CLUSTER_SIZE + used_offset;
754 opus_packetinfo p(payload_len, len, off);
755 total_len += p.length();
756 packets.push_back(p);
757 } catch(std::exception& e) {
758 (stringfmt() << "Can't write opus packet: " << e.what()).throwex();
762 void opus_stream::write_trailier()
764 try {
765 char descriptor[16];
766 uint32_t used_mcluster, used_moffset;
767 //The allocation must be done for real.
768 if(!next_mcluster)
769 next_mcluster = ctrl_cluster = fs.allocate_cluster();
770 //But the write must not update the pointers..
771 uint32_t tmp_mcluster = next_mcluster;
772 uint32_t tmp_moffset = next_moffset;
773 serialization::u32b(descriptor, 0);
774 serialization::u32b(descriptor + 4, (pregap_length << 8) | 0x02);
775 serialization::u32b(descriptor + 8, (postgap_length << 8) | 0x03);
776 serialization::s16b(descriptor + 12, gain);
777 serialization::u16b(descriptor + 14, 0x0004);
778 fs.write_data(tmp_mcluster, tmp_moffset, descriptor, 16, used_mcluster, used_moffset);
779 } catch(std::exception& e) {
780 (stringfmt() << "Can't write stream trailer: " << e.what()).throwex();
785 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
786 //Playing opus stream.
787 struct opus_playback_stream
789 //Create a new playing stream from given opus stream.
790 opus_playback_stream(opus_stream& data);
791 //Destroy playing opus stream.
792 ~opus_playback_stream();
793 //Read samples from stream.
794 //Can throw.
795 void read(float* data, size_t samples);
796 //Skip samples from stream.
797 //Can throw.
798 void skip(uint64_t samples);
799 //Has the stream already ended?
800 bool eof();
801 private:
802 opus_playback_stream(const opus_playback_stream&);
803 opus_playback_stream& operator=(const opus_playback_stream&);
804 //Can throw.
805 void decode_block();
806 float output[OPUS_MAX_OUT];
807 unsigned output_left;
808 uint32_t pregap_thrown;
809 bool postgap_thrown;
810 opus::decoder* decoder;
811 opus_stream& stream;
812 uint32_t next_block;
813 uint32_t blocks;
816 opus_playback_stream::opus_playback_stream(opus_stream& data)
817 : stream(data)
819 int err;
820 stream.get_ref();
821 stream.lock();
822 next_block = 0;
823 output_left = 0;
824 pregap_thrown = 0;
825 postgap_thrown = false;
826 blocks = stream.blocks();
827 decoder = new opus::decoder(opus::samplerate::r48k, false);
828 if(!decoder)
829 throw std::bad_alloc();
832 opus_playback_stream::~opus_playback_stream()
834 //No, we don't unlock the stream.
835 stream.put_ref();
836 delete decoder;
839 bool opus_playback_stream::eof()
841 return (next_block >= blocks && !output_left);
844 void opus_playback_stream::decode_block()
846 if(next_block >= blocks)
847 return;
848 if(output_left >= OPUS_MAX_OUT)
849 return;
850 unsigned plen = stream.packet_length(next_block);
851 if(plen + output_left > OPUS_MAX_OUT)
852 return;
853 std::vector<unsigned char> pdata = stream.packet(next_block);
854 try {
855 size_t c = decoder->decode(&pdata[0], pdata.size(), output + output_left,
856 OPUS_MAX_OUT - output_left);
857 output_left = min(output_left + c, static_cast<size_t>(OPUS_MAX_OUT));
858 } catch(...) {
859 //Bad packet, insert silence.
860 for(unsigned i = 0; i < plen; i++)
861 output[output_left++] = 0;
863 //Throw the pregap away if needed.
864 if(pregap_thrown < stream.get_pregap()) {
865 uint32_t throw_amt = min(stream.get_pregap() - pregap_thrown, (uint32_t)output_left);
866 if(throw_amt && throw_amt < output_left)
867 memmove(output, output + throw_amt, (output_left - throw_amt) * sizeof(float));
868 output_left -= throw_amt;
869 pregap_thrown += throw_amt;
871 next_block++;
874 void opus_playback_stream::read(float* data, size_t samples)
876 float lgain = stream.get_gain_linear();
877 while(samples > 0) {
878 decode_block();
879 if(next_block >= blocks && !postgap_thrown) {
880 //This is the final packet. Throw away postgap samples at the end.
881 uint32_t thrown = min(stream.get_postgap(), (uint32_t)output_left);
882 output_left -= thrown;
883 postgap_thrown = true;
885 if(next_block >= blocks && !output_left) {
886 //Zerofill remainder.
887 for(size_t i = 0; i < samples; i++)
888 data[i] = 0;
889 return;
891 unsigned maxcopy = min(static_cast<unsigned>(samples), output_left);
892 if(maxcopy) {
893 memcpy(data, output, maxcopy * sizeof(float));
894 for(size_t i = 0; i < maxcopy; i++)
895 data[i] *= lgain;
897 if(maxcopy < output_left && maxcopy)
898 memmove(output, output + maxcopy, (output_left - maxcopy) * sizeof(float));
899 output_left -= maxcopy;
900 samples -= maxcopy;
901 data += maxcopy;
905 void opus_playback_stream::skip(uint64_t samples)
907 //Adjust for preskip and declare all preskip already thrown away.
908 pregap_thrown = stream.get_pregap();
909 samples += pregap_thrown;
910 postgap_thrown = false;
911 //First, skip inside decoded samples.
912 if(samples < output_left) {
913 //Skipping less than amount in output buffer. Just discard from output buffer and try
914 //to decode a new block.
915 memmove(output, output + samples, (output_left - samples) * sizeof(float));
916 output_left -= samples;
917 decode_block();
918 return;
919 } else {
920 //Skipping at least the amount of samples in output buffer. First, blank the output buffer
921 //and count those towards samples discarded.
922 samples -= output_left;
923 output_left = 0;
925 //While number of samples is so great that adequate convergence period can be ensured without
926 //decoding this packet, just skip the samples from the packet.
927 while(samples > OPUS_CONVERGE_MAX) {
928 samples -= stream.packet_length(next_block++);
929 //Did we hit EOF?
930 if(next_block >= blocks)
931 return;
933 //Okay, we are near the point. Start decoding packets.
934 while(samples > 0) {
935 decode_block();
936 //Did we hit EOF?
937 if(next_block >= blocks && !output_left)
938 return;
939 //Skip as many samples as possible.
940 unsigned maxskip = min(static_cast<unsigned>(samples), output_left);
941 if(maxskip < output_left)
942 memmove(output, output + maxskip, (output_left - maxskip) * sizeof(float));
943 output_left -= maxskip;
944 samples -= maxskip;
946 //Just to be nice, decode a extra block.
947 decode_block();
951 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
952 //Collection of streams.
953 struct stream_collection
955 public:
956 //Create a new collection.
957 //Can throw.
958 stream_collection(filesystem::ref filesys);
959 //Destroy a collection. All streams are destroyed but not deleted.
960 ~stream_collection();
961 //Get list of streams active at given point.
962 std::list<uint64_t> streams_at(uint64_t point);
963 //Add a stream into collection.
964 //Can throw.
965 uint64_t add_stream(opus_stream& stream);
966 //Get the filesystem this collection is for.
967 filesystem::ref get_filesystem() { return fs; }
968 //Unlock all streams in collection.
969 void unlock_all();
970 //Get stream with given index (NULL if not found).
971 opus_stream* get_stream(uint64_t index)
973 umutex_class m(mutex);
974 if(streams.count(index)) {
975 streams[index]->get_ref();
976 return streams[index];
978 return NULL;
980 //Delete a stream.
981 //Can throw.
982 void delete_stream(uint64_t index);
983 //Alter stream timebase.
984 //Can throw.
985 void alter_stream_timebase(uint64_t index, uint64_t newts);
986 //Alter stream gain.
987 void alter_stream_gain(uint64_t index, uint16_t newgain);
988 //Enumerate all valid stream indices, in time order.
989 std::list<uint64_t> all_streams();
990 //Export the entiere superstream.
991 //Can throw.
992 void export_superstream(std::ofstream& out);
993 private:
994 filesystem::ref fs;
995 uint64_t next_index;
996 unsigned next_stream;
997 mutex_class mutex;
998 std::set<uint64_t> free_indices;
999 std::map<uint64_t, uint64_t> entries;
1000 std::multimap<uint64_t, uint64_t> streams_by_time;
1001 //FIXME: Something more efficient.
1002 std::map<uint64_t, opus_stream*> streams;
1005 stream_collection::stream_collection(filesystem::ref filesys)
1006 : fs(filesys)
1008 next_stream = 0;
1009 next_index = 0;
1010 //The stream index table is in cluster 2.
1011 uint32_t next_cluster = 2;
1012 uint32_t next_offset = 0;
1013 uint32_t i = 0;
1014 try {
1015 while(true) {
1016 char buffer[16];
1017 size_t r = fs.read_data(next_cluster, next_offset, buffer, 16);
1018 if(r < 16)
1019 break;
1020 uint64_t timebase = serialization::u64b(buffer);
1021 uint32_t ctrl_cluster = serialization::u32b(buffer + 8);
1022 uint32_t data_cluster = serialization::u32b(buffer + 12);
1023 if(ctrl_cluster) {
1024 opus_stream* x = new opus_stream(timebase, fs, ctrl_cluster, data_cluster);
1025 entries[next_index] = i;
1026 streams_by_time.insert(std::make_pair(timebase, next_index));
1027 streams[next_index++] = x;
1028 } else
1029 free_indices.insert(i);
1030 next_stream = ++i;
1032 } catch(std::exception& e) {
1033 for(auto i : streams)
1034 i.second->put_ref();
1035 (stringfmt() << "Failed to parse LSVS: " << e.what()).throwex();
1039 stream_collection::~stream_collection()
1041 umutex_class m(mutex);
1042 for(auto i : streams)
1043 i.second->put_ref();
1044 streams.clear();
1047 std::list<uint64_t> stream_collection::streams_at(uint64_t point)
1049 umutex_class m(mutex);
1050 std::list<uint64_t> s;
1051 for(auto i : streams) {
1052 uint64_t start = i.second->timebase();
1053 uint64_t end = start + i.second->length();
1054 if(point >= start && point < end) {
1055 i.second->get_ref();
1056 s.push_back(i.first);
1059 return s;
1062 uint64_t stream_collection::add_stream(opus_stream& stream)
1064 uint64_t idx;
1065 try {
1066 umutex_class m(mutex);
1067 //Lock the added stream so it doesn't start playing back immediately.
1068 stream.lock();
1069 idx = next_index++;
1070 streams[idx] = &stream;
1071 char buffer[16];
1072 serialization::u64b(buffer, stream.timebase());
1073 auto r = stream.get_clusters();
1074 serialization::u32b(buffer + 8, r.first);
1075 serialization::u32b(buffer + 12, r.second);
1076 uint64_t entry_number = 0;
1077 if(free_indices.empty())
1078 entry_number = next_stream++;
1079 else {
1080 entry_number = *free_indices.begin();
1081 free_indices.erase(entry_number);
1083 uint32_t write_cluster = 2;
1084 uint32_t write_offset = 0;
1085 uint32_t dummy1, dummy2;
1086 fs.skip_data(write_cluster, write_offset, 16 * entry_number);
1087 fs.write_data(write_cluster, write_offset, buffer, 16, dummy1, dummy2);
1088 streams_by_time.insert(std::make_pair(stream.timebase(), idx));
1089 entries[idx] = entry_number;
1090 return idx;
1091 } catch(std::exception& e) {
1092 (stringfmt() << "Failed to add stream: " << e.what()).throwex();
1094 return idx;
1097 void stream_collection::unlock_all()
1099 umutex_class m(mutex);
1100 for(auto i : streams)
1101 i.second->unlock();
1104 void stream_collection::delete_stream(uint64_t index)
1106 umutex_class m(mutex);
1107 if(!entries.count(index))
1108 return;
1109 uint64_t entry_number = entries[index];
1110 uint32_t write_cluster = 2;
1111 uint32_t write_offset = 0;
1112 uint32_t dummy1, dummy2;
1113 char buffer[16] = {0};
1114 fs.skip_data(write_cluster, write_offset, 16 * entry_number);
1115 fs.write_data(write_cluster, write_offset, buffer, 16, dummy1, dummy2);
1116 auto itr = streams_by_time.lower_bound(streams[index]->timebase());
1117 auto itr2 = streams_by_time.upper_bound(streams[index]->timebase());
1118 for(auto x = itr; x != itr2; x++)
1119 if(x->second == index) {
1120 streams_by_time.erase(x);
1121 break;
1123 streams[index]->delete_stream();
1124 streams.erase(index);
1127 void stream_collection::alter_stream_timebase(uint64_t index, uint64_t newts)
1129 try {
1130 umutex_class m(mutex);
1131 if(!streams.count(index))
1132 return;
1133 if(entries.count(index)) {
1134 char buffer[8];
1135 uint32_t write_cluster = 2;
1136 uint32_t write_offset = 0;
1137 uint32_t dummy1, dummy2;
1138 serialization::u64b(buffer, newts);
1139 fs.skip_data(write_cluster, write_offset, 16 * entries[index]);
1140 fs.write_data(write_cluster, write_offset, buffer, 8, dummy1, dummy2);
1142 auto itr = streams_by_time.lower_bound(streams[index]->timebase());
1143 auto itr2 = streams_by_time.upper_bound(streams[index]->timebase());
1144 for(auto x = itr; x != itr2; x++)
1145 if(x->second == index) {
1146 streams_by_time.erase(x);
1147 break;
1149 streams[index]->timebase(newts);
1150 streams_by_time.insert(std::make_pair(newts, index));
1151 } catch(std::exception& e) {
1152 (stringfmt() << "Failed to alter stream timebase: " << e.what()).throwex();
1156 void stream_collection::alter_stream_gain(uint64_t index, uint16_t newgain)
1158 try {
1159 umutex_class m(mutex);
1160 if(!streams.count(index))
1161 return;
1162 streams[index]->set_gain(newgain);
1163 streams[index]->write_trailier();
1164 } catch(std::exception& e) {
1165 (stringfmt() << "Failed to alter stream gain: " << e.what()).throwex();
1169 std::list<uint64_t> stream_collection::all_streams()
1171 umutex_class m(mutex);
1172 std::list<uint64_t> s;
1173 for(auto i : streams_by_time)
1174 s.push_back(i.second);
1175 return s;
1178 void stream_collection::export_superstream(std::ofstream& out)
1180 std::list<uint64_t> slist = all_streams();
1181 //Find the total length of superstream.
1182 uint64_t len = 0;
1183 for(auto i : slist) {
1184 opus_stream* s = get_stream(i);
1185 if(s) {
1186 len = max(len, s->timebase() + s->length());
1187 s->put_ref();
1190 char header[32];
1191 serialization::u64l(header, 0x1C586F532EULL); //Magic and header size.
1192 serialization::u64l(header + 8, len);
1193 serialization::u64l(header + 16, 4676829883349860352ULL); //Sampling rate.
1194 serialization::u64l(header + 24, 1);
1195 out.write(header, 32);
1196 if(!out)
1197 throw std::runtime_error("Error writing PCM output");
1199 //Find the first valid stream.
1200 auto next_i = slist.begin();
1201 opus_stream* next_stream = NULL;
1202 while(next_i != slist.end()) {
1203 next_stream = get_stream(*next_i);
1204 next_i++;
1205 if(next_stream)
1206 break;
1208 uint64_t next_ts;
1209 next_ts = next_stream ? next_stream->timebase() : len;
1211 std::list<opus_playback_stream*> active;
1212 try {
1213 for(uint64_t s = 0; s < len;) {
1214 if(s == next_ts) {
1215 active.push_back(new opus_playback_stream(*next_stream));
1216 next_stream->put_ref();
1217 next_stream = NULL;
1218 while(next_i != slist.end()) {
1219 next_stream = get_stream(*next_i);
1220 next_i++;
1221 if(!next_stream)
1222 continue;
1223 uint64_t next_ts = next_stream->timebase();
1224 if(next_ts > s)
1225 break;
1226 //Okay, this starts too...
1227 active.push_back(new opus_playback_stream(*next_stream));
1228 next_stream->put_ref();
1229 next_stream = NULL;
1231 next_ts = next_stream ? next_stream->timebase() : len;
1233 uint64_t maxsamples = min(next_ts - s, static_cast<uint64_t>(OUTPUT_BLOCK));
1234 maxsamples = min(maxsamples, len - s);
1235 char outbuf[4 * OUTPUT_BLOCK];
1236 float buf1[OUTPUT_BLOCK];
1237 float buf2[OUTPUT_BLOCK];
1238 for(size_t t = 0; t < maxsamples; t++)
1239 buf1[t] = 0;
1240 for(auto t : active) {
1241 t->read(buf2, maxsamples);
1242 for(size_t u = 0; u < maxsamples; u++)
1243 buf1[u] += buf2[u];
1245 for(auto t = active.begin(); t != active.end();) {
1246 if((*t)->eof()) {
1247 auto todel = t;
1248 t++;
1249 delete *todel;
1250 active.erase(todel);
1251 } else
1252 t++;
1254 for(size_t t = 0; t < maxsamples; t++)
1255 serialization::s32l(outbuf + 4 * t, buf1[t] * 268435456);
1256 out.write(outbuf, 4 * maxsamples);
1257 if(!out)
1258 throw std::runtime_error("Failed to write PCM");
1259 s += maxsamples;
1261 } catch(std::exception& e) {
1262 (stringfmt() << "Failed to export PCM: " << e.what()).throwex();
1264 for(auto t = active.begin(); t != active.end();) {
1265 if((*t)->eof()) {
1266 auto todelete = t;
1267 t++;
1268 delete *todelete;
1269 active.erase(todelete);
1270 } else
1271 t++;
1275 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1276 void start_management_stream(opus_stream& s)
1278 opus_playback_stream* p = new opus_playback_stream(s);
1279 umutex_class m(active_playback_streams_lock);
1280 active_playback_streams.push_back(p);
1283 void advance_time(uint64_t newtime)
1285 umutex_class m2(current_collection_lock);
1286 if(!current_collection) {
1287 //Clear all.
1288 umutex_class m(active_playback_streams_lock);
1289 for(auto i : active_playback_streams)
1290 delete i;
1291 active_playback_streams.clear();
1292 return;
1294 std::list<uint64_t> sactive = current_collection->streams_at(newtime);
1295 for(auto j : sactive) {
1296 opus_stream* i = current_collection->get_stream(j);
1297 if(!i)
1298 continue;
1299 //Don't play locked streams in order to avoid double playing.
1300 umutex_class m(active_playback_streams_lock);
1301 try {
1302 if(!i->islocked())
1303 active_playback_streams.push_back(new opus_playback_stream(*i));
1304 } catch(std::exception& e) {
1305 messages << "Can't start stream: " << e.what() << std::endl;
1307 i->put_ref();
1311 void jump_time(uint64_t newtime)
1313 umutex_class m2(current_collection_lock);
1314 if(!current_collection) {
1315 //Clear all.
1316 umutex_class m(active_playback_streams_lock);
1317 for(auto i : active_playback_streams)
1318 delete i;
1319 active_playback_streams.clear();
1320 return;
1322 //Close all currently playing streams.
1324 umutex_class m(active_playback_streams_lock);
1325 for(auto i : active_playback_streams)
1326 delete i;
1327 active_playback_streams.clear();
1329 //Unlock all streams, so they will play.
1330 current_collection->unlock_all();
1331 //Reopen all streams that should be open (with seeking)
1332 std::list<uint64_t> sactive = current_collection->streams_at(newtime);
1333 for(auto j : sactive) {
1334 opus_stream* i = current_collection->get_stream(j);
1335 if(!i)
1336 continue;
1337 //No need to check for locks, because we just busted all of those.
1338 uint64_t p = newtime - i->timebase();
1339 opus_playback_stream* s;
1340 try {
1341 s = new opus_playback_stream(*i);
1342 } catch(std::exception& e) {
1343 messages << "Can't start stream: " << e.what() << std::endl;
1345 i->put_ref();
1346 if(!s)
1347 continue;
1348 s->skip(p);
1349 umutex_class m(active_playback_streams_lock);
1350 active_playback_streams.push_back(s);
1354 //Resample.
1355 void do_resample(audioapi_resampler& r, float* srcbuf, size_t& srcuse, float* dstbuf, size_t& dstuse,
1356 size_t dstmax, double ratio)
1358 if(srcuse == 0 || dstuse >= dstmax)
1359 return;
1360 float* in = srcbuf;
1361 size_t in_u = srcuse;
1362 float* out = dstbuf + dstuse;
1363 size_t out_u = dstmax - dstuse;
1364 r.resample(in, in_u, out, out_u, ratio, false);
1365 size_t offset = in - srcbuf;
1366 if(offset < srcuse)
1367 memmove(srcbuf, srcbuf + offset, sizeof(float) * (srcuse - offset));
1368 srcuse -= offset;
1369 dstuse = dstmax - out_u;
1372 //Drain the input buffer.
1373 void drain_input()
1375 while(audioapi_voice_r_status() > 0) {
1376 float buf[256];
1377 unsigned size = min(audioapi_voice_r_status(), 256u);
1378 audioapi_record_voice(buf, size);
1382 //Read the input buffer.
1383 void read_input(float* buf, size_t& use, size_t maxuse)
1385 size_t rleft = audioapi_voice_r_status();
1386 unsigned toread = min(rleft, max(maxuse, use) - use);
1387 if(toread > 0) {
1388 audioapi_record_voice(buf + use, toread);
1389 use += toread;
1393 //Compress Opus block.
1394 void compress_opus_block(opus::encoder& e, float* buf, size_t& use, opus_stream& active_stream,
1395 bitrate_tracker& brtrack)
1397 const size_t opus_out_max = 1276;
1398 unsigned char opus_output[opus_out_max];
1399 size_t cblock = 0;
1400 if(use >= 960)
1401 cblock = 960;
1402 else if(use >= 480)
1403 cblock = 480;
1404 else if(use >= 240)
1405 cblock = 240;
1406 else if(use >= 120)
1407 cblock = 120;
1408 else
1409 return; //No valid data to compress.
1410 const size_t opus_out_max2 = opus_max_bitrate.get() * cblock / 384000;
1411 try {
1412 size_t c = e.encode(buf, cblock, opus_output, opus_out_max2);
1413 //Successfully compressed a block.
1414 size_t opus_output_len = c;
1415 brtrack.submit(c, cblock);
1416 try {
1417 active_stream.write(cblock / 120, opus_output, opus_output_len);
1418 } catch(std::exception& e) {
1419 messages << "Error writing data: " << e.what() << std::endl;
1421 } catch(std::exception& e) {
1422 messages << "Opus encoder error: " << e.what() << std::endl;
1424 use -= cblock;
1427 void update_time()
1429 uint64_t sampletime;
1430 bool jumping;
1432 umutex_class m(time_mutex);
1433 sampletime = current_time;
1434 jumping = time_jump;
1435 time_jump = false;
1437 if(jumping)
1438 jump_time(sampletime);
1439 else
1440 advance_time(sampletime);
1443 void decompress_active_streams(float* out, size_t& use)
1445 size_t base = use;
1446 use += OUTPUT_BLOCK;
1447 for(unsigned i = 0; i < OUTPUT_BLOCK; i++)
1448 out[i + base] = 0;
1449 //Do it this way to minimize the amount of time playback streams lock
1450 //is held.
1451 std::list<opus_playback_stream*> stmp;
1453 umutex_class m(active_playback_streams_lock);
1454 stmp = active_playback_streams;
1456 std::set<opus_playback_stream*> toerase;
1457 for(auto i : stmp) {
1458 float tmp[OUTPUT_BLOCK];
1459 try {
1460 i->read(tmp, OUTPUT_BLOCK);
1461 } catch(std::exception& e) {
1462 messages << "Failed to decompress: " << e.what() << std::endl;
1463 for(unsigned j = 0; j < OUTPUT_BLOCK; j++)
1464 tmp[j] = 0;
1466 for(unsigned j = 0; j < OUTPUT_BLOCK; j++)
1467 out[j + base] += tmp[j];
1468 if(i->eof())
1469 toerase.insert(i);
1472 umutex_class m(active_playback_streams_lock);
1473 for(auto i = active_playback_streams.begin(); i != active_playback_streams.end();) {
1474 if(toerase.count(*i)) {
1475 auto toerase = i;
1476 i++;
1477 delete *toerase;
1478 active_playback_streams.erase(toerase);
1479 } else
1480 i++;
1485 void handle_tangent_positive_edge(opus::encoder& e, opus_stream*& active_stream, bitrate_tracker& brtrack)
1487 umutex_class m2(current_collection_lock);
1488 if(!current_collection)
1489 return;
1490 try {
1491 e.ctl(opus::reset);
1492 e.ctl(opus::bitrate(opus_bitrate.get()));
1493 brtrack.reset();
1494 uint64_t ctime;
1496 umutex_class m(time_mutex);
1497 ctime = current_time;
1499 active_stream = NULL;
1500 active_stream = new opus_stream(ctime, current_collection->get_filesystem());
1501 int32_t pregap = e.ctl(opus::lookahead);
1502 active_stream->set_pregap(pregap);
1503 } catch(std::exception& e) {
1504 messages << "Can't start stream: " << e.what() << std::endl;
1505 return;
1507 messages << "Tangent enaged." << std::endl;
1510 void handle_tangent_negative_edge(opus_stream*& active_stream, bitrate_tracker& brtrack)
1512 umutex_class m2(current_collection_lock);
1513 messages << "Tangent disenaged: " << brtrack;
1514 try {
1515 active_stream->write_trailier();
1516 } catch(std::exception& e) {
1517 messages << e.what() << std::endl;
1519 if(current_collection) {
1520 try {
1521 current_collection->add_stream(*active_stream);
1522 } catch(std::exception& e) {
1523 messages << "Can't add stream: " << e.what() << std::endl;
1524 active_stream->put_ref();
1526 notify_voice_stream_change();
1527 } else
1528 active_stream->put_ref();
1529 active_stream = NULL;
1532 class inthread_th : public worker_thread
1534 public:
1535 inthread_th()
1537 quit = false;
1538 quit_ack = false;
1539 rptr = 0;
1540 fire();
1542 void kill()
1544 quit = true;
1546 umutex_class h(lmut);
1547 lcond.notify_all();
1549 while(!quit_ack)
1550 usleep(100000);
1551 usleep(100000);
1553 protected:
1554 void entry()
1556 try {
1557 entry2();
1558 } catch(std::bad_alloc& e) {
1559 OOM_panic();
1560 } catch(std::exception& e) {
1561 messages << "AIEEE... Fatal exception in voice thread: " << e.what() << std::endl;
1563 quit_ack = true;
1565 void entry2()
1567 //Wait for libopus to load...
1568 size_t cbh = opus::add_callback([this]() {
1569 umutex_class h(this->lmut);
1570 this->lcond.notify_all();
1572 while(true) {
1573 umutex_class h(lmut);
1574 if(opus::libopus_loaded() || quit)
1575 break;
1576 lcond.wait(h);
1578 opus::cancel_callback(cbh);
1579 if(quit)
1580 return;
1582 int err;
1583 opus::encoder oenc(opus::samplerate::r48k, false, opus::application::voice);
1584 oenc.ctl(opus::bitrate(opus_bitrate.get()));
1585 audioapi_resampler rin;
1586 audioapi_resampler rout;
1587 const unsigned buf_max = 6144; //These buffers better be large.
1588 size_t buf_in_use = 0;
1589 size_t buf_inr_use = 0;
1590 size_t buf_outr_use = 0;
1591 size_t buf_out_use = 0;
1592 float buf_in[buf_max];
1593 float buf_inr[OPUS_BLOCK_SIZE];
1594 float buf_outr[OUTPUT_SIZE];
1595 float buf_out[buf_max];
1596 bitrate_tracker brtrack;
1597 opus_stream* active_stream = NULL;
1599 drain_input();
1600 while(1) {
1601 if(clear_workflag(WORKFLAG_QUIT_REQUEST) & WORKFLAG_QUIT_REQUEST) {
1602 if(!active_flag && active_stream)
1603 handle_tangent_negative_edge(active_stream, brtrack);
1604 break;
1606 uint64_t ticks = get_utime();
1607 //Handle tangent edgets.
1608 if(active_flag && !active_stream) {
1609 drain_input();
1610 buf_in_use = 0;
1611 buf_inr_use = 0;
1612 handle_tangent_positive_edge(oenc, active_stream, brtrack);
1614 else if((!active_flag || quit) && active_stream)
1615 handle_tangent_negative_edge(active_stream, brtrack);
1616 if(quit)
1617 break;
1619 //Read input, up to 25ms.
1620 unsigned rate_in = audioapi_voice_rate().first;
1621 unsigned rate_out = audioapi_voice_rate().second;
1622 size_t dbuf_max = min(buf_max, rate_in / REC_THRESHOLD_DIV);
1623 read_input(buf_in, buf_in_use, dbuf_max);
1625 //Resample up to full opus block.
1626 do_resample(rin, buf_in, buf_in_use, buf_inr, buf_inr_use, OPUS_BLOCK_SIZE,
1627 1.0 * OPUS_SAMPLERATE / rate_in);
1629 //If we have full opus block and recording is enabled, compress it.
1630 if(buf_inr_use >= OPUS_BLOCK_SIZE && active_stream)
1631 compress_opus_block(oenc, buf_inr, buf_inr_use, *active_stream, brtrack);
1633 //Update time, starting/ending streams.
1634 update_time();
1636 //Decompress active streams.
1637 if(buf_outr_use < BLOCK_THRESHOLD)
1638 decompress_active_streams(buf_outr, buf_outr_use);
1640 //Resample to output rate.
1641 do_resample(rout, buf_outr, buf_outr_use, buf_out, buf_out_use, buf_max,
1642 1.0 * rate_out / OPUS_SAMPLERATE);
1644 //Output stuff.
1645 if(buf_out_use > 0 && audioapi_voice_p_status2() < rate_out / PLAY_THRESHOLD_DIV) {
1646 audioapi_play_voice(buf_out, buf_out_use);
1647 buf_out_use = 0;
1650 //Sleep a bit to save CPU use.
1651 uint64_t ticks_spent = get_utime() - ticks;
1652 if(ticks_spent < ITERATION_TIME)
1653 usleep(ITERATION_TIME - ticks_spent);
1655 umutex_class h(current_collection_lock);
1656 delete current_collection;
1657 current_collection = NULL;
1659 private:
1660 size_t rptr;
1661 double position;
1662 volatile bool quit;
1663 volatile bool quit_ack;
1664 mutex_class lmut;
1665 cv_class lcond;
1668 //The tangent function.
1669 command::fnptr<> ptangent(lsnes_cmd, "+tangent", "Voice tangent",
1670 "Syntax: +tangent\nVoice tangent.\n",
1671 []() throw(std::bad_alloc, std::runtime_error) {
1672 active_flag = true;
1674 command::fnptr<> ntangent(lsnes_cmd, "-tangent", "Voice tangent",
1675 "Syntax: -tangent\nVoice tangent.\n",
1676 []() throw(std::bad_alloc, std::runtime_error) {
1677 active_flag = false;
1679 keyboard::invbind itangent(lsnes_mapper, "+tangent", "Movie‣Voice tangent");
1680 inthread_th* int_task;
1683 //Rate is not sampling rate!
1684 void voice_frame_number(uint64_t newframe, double rate)
1686 if(rate == last_rate && last_frame_number == newframe)
1687 return;
1688 umutex_class m(time_mutex);
1689 current_time = newframe / rate * OPUS_SAMPLERATE;
1690 if(fabs(rate - last_rate) > 1e-6 || last_frame_number + 1 != newframe)
1691 time_jump = true;
1692 last_frame_number = newframe;
1693 last_rate = rate;
1696 void voicethread_task()
1698 int_task = new inthread_th;
1701 void voicethread_kill()
1703 int_task->kill();
1704 delete int_task;
1705 int_task = NULL;
1708 uint64_t voicesub_parse_timebase(const std::string& n)
1710 std::string x = n;
1711 if(x.length() > 0 && x[x.length() - 1] == 's') {
1712 x = x.substr(0, x.length() - 1);
1713 return 48000 * parse_value<double>(x);
1714 } else
1715 return parse_value<uint64_t>(x);
1718 bool voicesub_collection_loaded()
1720 umutex_class m2(current_collection_lock);
1721 return (current_collection != NULL);
1724 std::list<playback_stream_info> voicesub_get_stream_info()
1726 umutex_class m2(current_collection_lock);
1727 std::list<playback_stream_info> in;
1728 if(!current_collection)
1729 return in;
1730 for(auto i : current_collection->all_streams()) {
1731 opus_stream* s = current_collection->get_stream(i);
1732 playback_stream_info pi;
1733 if(!s)
1734 continue;
1735 pi.id = i;
1736 pi.base = s->timebase();
1737 pi.length = s->length();
1738 try {
1739 in.push_back(pi);
1740 } catch(...) {
1742 s->put_ref();
1744 return in;
1747 void voicesub_play_stream(uint64_t id)
1749 umutex_class m2(current_collection_lock);
1750 if(!current_collection)
1751 throw std::runtime_error("No collection loaded");
1752 opus_stream* s = current_collection->get_stream(id);
1753 if(!s)
1754 return;
1755 try {
1756 start_management_stream(*s);
1757 } catch(...) {
1758 s->put_ref();
1759 throw;
1761 s->put_ref();
1764 void voicesub_export_stream(uint64_t id, const std::string& filename, external_stream_format fmt)
1766 umutex_class m2(current_collection_lock);
1767 if(!current_collection)
1768 throw std::runtime_error("No collection loaded");
1769 opus_stream* st = current_collection->get_stream(id);
1770 if(!st)
1771 return;
1772 std::ofstream s(filename, std::ios_base::out | std::ios_base::binary);
1773 if(!s) {
1774 st->put_ref();
1775 throw std::runtime_error("Can't open output file");
1777 try {
1778 st->export_stream(s, fmt);
1779 } catch(std::exception& e) {
1780 st->put_ref();
1781 (stringfmt() << "Export failed: " << e.what()).throwex();
1783 st->put_ref();
1786 uint64_t voicesub_import_stream(uint64_t ts, const std::string& filename, external_stream_format fmt)
1788 umutex_class m2(current_collection_lock);
1789 if(!current_collection)
1790 throw std::runtime_error("No collection loaded");
1792 std::ifstream s(filename, std::ios_base::in | std::ios_base::binary);
1793 if(!s)
1794 throw std::runtime_error("Can't open input file");
1795 opus_stream* st = new opus_stream(ts, current_collection->get_filesystem(), s, fmt);
1796 uint64_t id;
1797 try {
1798 id = current_collection->add_stream(*st);
1799 } catch(...) {
1800 st->delete_stream();
1801 throw;
1803 st->unlock(); //Not locked.
1804 notify_voice_stream_change();
1805 return id;
1808 void voicesub_delete_stream(uint64_t id)
1810 umutex_class m2(current_collection_lock);
1811 if(!current_collection)
1812 throw std::runtime_error("No collection loaded");
1813 current_collection->delete_stream(id);
1814 notify_voice_stream_change();
1817 void voicesub_export_superstream(const std::string& filename)
1819 umutex_class m2(current_collection_lock);
1820 if(!current_collection)
1821 throw std::runtime_error("No collection loaded");
1822 std::ofstream s(filename, std::ios_base::out | std::ios_base::binary);
1823 if(!s)
1824 throw std::runtime_error("Can't open output file");
1825 current_collection->export_superstream(s);
1828 void voicesub_load_collection(const std::string& filename)
1830 umutex_class m2(current_collection_lock);
1831 filesystem::ref newfs;
1832 stream_collection* newc;
1833 newfs = filesystem::ref(filename);
1834 newc = new stream_collection(newfs);
1835 if(current_collection)
1836 delete current_collection;
1837 current_collection = newc;
1838 notify_voice_stream_change();
1841 void voicesub_unload_collection()
1843 umutex_class m2(current_collection_lock);
1844 if(current_collection)
1845 delete current_collection;
1846 current_collection = NULL;
1847 notify_voice_stream_change();
1850 void voicesub_alter_timebase(uint64_t id, uint64_t ts)
1852 umutex_class m2(current_collection_lock);
1853 if(!current_collection)
1854 throw std::runtime_error("No collection loaded");
1855 current_collection->alter_stream_timebase(id, ts);
1856 notify_voice_stream_change();
1859 float voicesub_get_gain(uint64_t id)
1861 umutex_class m2(current_collection_lock);
1862 if(!current_collection)
1863 throw std::runtime_error("No collection loaded");
1864 return current_collection->get_stream(id)->get_gain() / 256.0;
1867 void voicesub_set_gain(uint64_t id, float gain)
1869 umutex_class m2(current_collection_lock);
1870 if(!current_collection)
1871 throw std::runtime_error("No collection loaded");
1872 int64_t _gain = gain * 256;
1873 if(_gain < -32768 || _gain > 32767)
1874 throw std::runtime_error("Gain out of range (+-128dB)");
1875 current_collection->alter_stream_gain(id, _gain);
1876 notify_voice_stream_change();
1879 double voicesub_ts_seconds(uint64_t ts)
1881 return ts / 48000.0;