lsnes rr2-β24
[lsnes.git] / src / core / inthread.cpp
blobeddad8d02c3a950a7763d8a44f0d36c503fbc5b4
1 #include "lsnes.hpp"
3 #include "cmdhelp/commentary.hpp"
4 #include "core/audioapi.hpp"
5 #include "core/command.hpp"
6 #include "core/dispatch.hpp"
7 #include "core/framerate.hpp"
8 #include "core/instance.hpp"
9 #include "core/inthread.hpp"
10 #include "core/keymapper.hpp"
11 #include "core/messages.hpp"
12 #include "core/random.hpp"
13 #include "core/settings.hpp"
14 #include "library/filesystem.hpp"
15 #include "library/minmax.hpp"
16 #include "library/ogg.hpp"
17 #include "library/opus.hpp"
18 #include "library/opus-ogg.hpp"
19 #include "library/serialization.hpp"
20 #include "library/string.hpp"
21 #include "library/workthread.hpp"
23 #include <cstdint>
24 #include <cmath>
25 #include <list>
26 #include <iostream>
27 #include <fstream>
28 #include <cstring>
29 #include <unistd.h>
30 #include <sys/time.h>
31 #include <zlib.h>
33 //Farther than this, packets can be fastskipped.
34 #define OPUS_CONVERGE_MAX 5760
35 //Maximum size of PCM output for one packet.
36 #define OPUS_MAX_OUT 5760
37 //Output block size.
38 #define OUTPUT_BLOCK 1440
39 //Main sampling rate.
40 #define OPUS_SAMPLERATE 48000
41 //Opus block size
42 #define OPUS_BLOCK_SIZE 960
43 //Threshold for decoding additional block
44 #define BLOCK_THRESHOLD 1200
45 //Maximum output block size.
46 #define OUTPUT_SIZE (BLOCK_THRESHOLD + OUTPUT_BLOCK)
47 //Amount of microseconds per interation.
48 #define ITERATION_TIME 15000
49 //Opus bitrate to use.
50 #define OPUS_BITRATE 48000
51 //Opus min bitrate to use.
52 #define OPUS_MIN_BITRATE 8000
53 //Opus max bitrate to use.
54 #define OPUS_MAX_BITRATE 255000
55 //Ogg Opus granule rate.
56 #define OGGOPUS_GRANULERATE 48000
57 //Record buffer size threshold divider.
58 #define REC_THRESHOLD_DIV 40
59 //Playback buffer size threshold divider.
60 #define PLAY_THRESHOLD_DIV 30
61 //Special granule position: None.
62 #define GRANULEPOS_NONE 0xFFFFFFFFFFFFFFFFULL
64 namespace
66 class opus_playback_stream;
67 class opus_stream;
68 class stream_collection;
69 class bitrate_tracker;
70 class inthread_th;
72 settingvar::supervariable<settingvar::model_int<OPUS_MIN_BITRATE,OPUS_MAX_BITRATE>> SET_opus_bitrate(
73 lsnes_setgrp, "opus-bitrate", "commentary‣Bitrate", OPUS_BITRATE);
74 settingvar::supervariable<settingvar::model_int<OPUS_MIN_BITRATE,OPUS_MAX_BITRATE>> SET_opus_max_bitrate(
75 lsnes_setgrp, "opus-max-bitrate", "commentary‣Max bitrate", OPUS_MAX_BITRATE);
77 struct voicesub_state
79 voicesub_state(settingvar::group& _settings, emulator_dispatch& _dispatch, audioapi_instance& _audio)
80 : settings(_settings), edispatch(_dispatch), audio(_audio)
82 current_time = 0;
83 time_jump = false;
84 active_flag = false;
85 last_frame_number = 0;
86 last_rate = 0;
87 current_collection = NULL;
88 int_task = NULL;
90 //Recording active flag.
91 volatile bool active_flag;
92 //Last seen frame number.
93 uint64_t last_frame_number;
94 //Last seen rate.
95 double last_rate;
96 //Mutex protecting current_time and time_jump.
97 threads::lock time_mutex;
98 //The current time.
99 uint64_t current_time;
100 //Time jump flag. Set if time jump is detected.
101 //If time jump is detected, all current playing streams are stopped, stream locks are cleared and
102 //apropriate streams are restarted. If time jump is false, all unlocked streams coming into range
103 //are started.
104 bool time_jump;
105 //Lock protecting active_playback_streams.
106 threads::lock active_playback_streams_lock;
107 //List of streams currently playing.
108 std::list<opus_playback_stream*> active_playback_streams;
109 //The collection of streams.
110 stream_collection* current_collection;
111 //Lock protecting current collection.
112 threads::lock current_collection_lock;
113 //The task handling the stuff.
114 inthread_th* int_task;
115 //Functions.
116 void start_management_stream(opus_stream& s);
117 void advance_time(uint64_t newtime);
118 void jump_time(uint64_t newtime);
119 void do_resample(audioapi_instance::resampler& r, float* srcbuf, size_t& srcuse, float* dstbuf,
120 size_t& dstuse, size_t dstmax, double ratio);
121 void drain_input();
122 void read_input(float* buf, size_t& use, size_t maxuse);
123 void compress_opus_block(opus::encoder& e, float* buf, size_t& use,
124 opus_stream& active_stream, bitrate_tracker& brtrack);
125 void update_time();
126 void decompress_active_streams(float* out, size_t& use);
127 void handle_tangent_positive_edge(opus::encoder& e, opus_stream*& active_stream,
128 bitrate_tracker& brtrack);
129 void handle_tangent_negative_edge(opus_stream*& active_stream, bitrate_tracker& brtrack);
130 settingvar::group& settings;
131 emulator_dispatch& edispatch;
132 audioapi_instance& audio;
135 voicesub_state* get_state(void* ptr)
137 auto x = reinterpret_cast<voicesub_state*>(ptr);
138 if(!x)
139 throw std::runtime_error("voice_commentary not initialized");
140 return x;
143 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
144 //Bitrate tracker.
145 struct bitrate_tracker
147 bitrate_tracker() throw();
148 void reset() throw();
149 double get_min() throw();
150 double get_avg() throw();
151 double get_max() throw();
152 double get_length() throw();
153 uint64_t get_bytes() throw();
154 uint64_t get_blocks() throw();
155 void submit(uint32_t bytes, uint32_t samples) throw();
156 private:
157 uint64_t blocks;
158 uint64_t samples;
159 uint64_t bytes;
160 uint32_t minrate;
161 uint32_t maxrate;
164 bitrate_tracker::bitrate_tracker() throw()
166 reset();
169 void bitrate_tracker::reset() throw()
171 blocks = 0;
172 samples = 0;
173 bytes = 0;
174 minrate = std::numeric_limits<uint32_t>::max();
175 maxrate = 0;
178 double bitrate_tracker::get_min() throw()
180 return blocks ? minrate / 1000.0 : 0.0;
183 double bitrate_tracker::get_avg() throw()
185 return samples ? bytes / (125.0 * samples / OPUS_SAMPLERATE) : 0.0;
188 double bitrate_tracker::get_max() throw()
190 return blocks ? maxrate / 1000.0 : 0.0;
193 double bitrate_tracker::get_length() throw()
195 return 1.0 * samples / OPUS_SAMPLERATE;
198 uint64_t bitrate_tracker::get_bytes() throw()
200 return bytes;
203 uint64_t bitrate_tracker::get_blocks() throw()
205 return blocks;
208 void bitrate_tracker::submit(uint32_t _bytes, uint32_t _samples) throw()
210 blocks++;
211 samples += _samples;
212 bytes += _bytes;
213 uint32_t irate = _bytes * 8 * OPUS_SAMPLERATE / OPUS_BLOCK_SIZE;
214 minrate = min(minrate, irate);
215 maxrate = max(maxrate, irate);
218 std::ostream& operator<<(std::ostream& s, bitrate_tracker& t)
220 s << t.get_bytes() << " bytes for " << t.get_length() << "s (" << t.get_blocks() << " blocks)"
221 << std::endl << "Bitrate (kbps): min: " << t.get_min() << " avg: " << t.get_avg() << " max:"
222 << t.get_max() << std::endl;
223 return s;
226 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
227 //Information about individual opus packet in stream.
228 struct opus_packetinfo
230 //Length is in units of 1/400th of a second.
231 opus_packetinfo(uint16_t datasize, uint8_t length, uint64_t offset)
233 descriptor = (offset & 0xFFFFFFFFFFULL) | (static_cast<uint64_t>(length) << 40) |
234 (static_cast<uint64_t>(datasize) << 48);
236 //Get the data size of the packet.
237 uint16_t size() { return descriptor >> 48; }
238 //Calculate the length of packet in samples.
239 uint16_t length() { return 120 * ((descriptor >> 40) & 0xFF); }
240 //Calculate the true offset.
241 uint64_t offset() { return descriptor & 0xFFFFFFFFFFULL; }
242 //Read the packet.
243 //Can throw.
244 std::vector<unsigned char> packet(filesystem::ref from_sys);
245 private:
246 uint64_t descriptor;
249 std::vector<unsigned char> opus_packetinfo::packet(filesystem::ref from_sys)
251 std::vector<unsigned char> ret;
252 uint64_t off = offset();
253 uint32_t sz = size();
254 uint32_t cluster = off / CLUSTER_SIZE;
255 uint32_t coff = off % CLUSTER_SIZE;
256 ret.resize(sz);
257 size_t r = from_sys.read_data(cluster, coff, &ret[0], sz);
258 if(r != sz)
259 throw std::runtime_error("Incomplete read");
260 return ret;
263 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
264 //Information about opus stream.
265 struct opus_stream
267 //Create new empty stream with specified base time.
268 opus_stream(uint64_t base, filesystem::ref filesys);
269 //Read stream with specified base time and specified start clusters.
270 //Can throw.
271 opus_stream(uint64_t base, filesystem::ref filesys, uint32_t ctrl_cluster, uint32_t data_cluster);
272 //Import a stream with specified base time.
273 //Can throw.
274 opus_stream(uint64_t base, filesystem::ref filesys, std::ifstream& data,
275 voice_commentary::external_stream_format extfmt, settingvar::group& settings);
276 //Delete this stream (also puts a ref)
277 void delete_stream() { deleting = true; put_ref(); }
278 //Export a stream.
279 //Can throw.
280 void export_stream(std::ofstream& data, voice_commentary::external_stream_format extfmt);
281 //Get length of specified packet in samples.
282 uint16_t packet_length(uint32_t seqno)
284 return (seqno < packets.size()) ? packets[seqno].length() : 0;
286 //Get data of specified packet.
287 //Can throw.
288 std::vector<unsigned char> packet(uint32_t seqno)
290 return (seqno < packets.size()) ? packets[seqno].packet(fs) : std::vector<unsigned char>();
292 //Get base time in samples for stream.
293 uint64_t timebase() { return s_timebase; }
294 //Set base time in samples for stream.
295 void timebase(uint64_t ts) { s_timebase = ts; }
296 //Get length of stream in samples.
297 uint64_t length()
299 if(pregap_length + postgap_length > total_len)
300 return 0;
301 else
302 return total_len - pregap_length - postgap_length;
304 //Set the pregap length.
305 void set_pregap(uint32_t p) { pregap_length = p; }
306 //Get the pregap length.
307 uint32_t get_pregap() { return pregap_length; }
308 //Set the postgap length.
309 void set_potsgap(uint32_t p) { postgap_length = p; }
310 //Get the postgap length.
311 uint32_t get_postgap() { return postgap_length; }
312 //Set gain.
313 void set_gain(int16_t g) { gain = g; }
314 //Get gain.
315 int16_t get_gain() { return gain; }
316 //Get linear gain.
317 float get_gain_linear() { return pow(10, gain / 20); }
318 //Get number of packets in stream.
319 uint32_t blocks() { return packets.size(); }
320 //Is this stream locked?
321 bool islocked() { return locked; }
322 //Lock a stream.
323 void lock() { locked = true; }
324 //Unlock a stream.
325 void unlock() { locked = false; }
326 //Increment reference count.
327 void get_ref() { threads::alock m(reflock); refcount++; }
328 //Decrement reference count, destroying object if it hits zero.
329 void put_ref() { threads::alock m(reflock); refcount--; if(!refcount) destroy(); }
330 //Add new packet into stream.
331 //Not safe to call simultaneously with packet_length() or packet().
332 //Can throw.
333 void write(uint8_t len, const unsigned char* payload, size_t payload_len);
334 //Write stream trailer.
335 void write_trailier();
336 //Get clusters.
337 std::pair<uint32_t, uint32_t> get_clusters() { return std::make_pair(ctrl_cluster, data_cluster); }
338 private:
339 void export_stream_sox(std::ofstream& data);
340 void export_stream_oggopus(std::ofstream& data);
341 void import_stream_sox(std::ifstream& data, settingvar::group& settings);
342 void import_stream_oggopus(std::ifstream& data);
344 opus_stream(const opus_stream&);
345 opus_stream& operator=(const opus_stream&);
346 void destroy();
347 filesystem::ref fs;
348 std::vector<opus_packetinfo> packets;
349 uint64_t total_len;
350 uint64_t s_timebase;
351 uint32_t next_cluster;
352 uint32_t next_offset;
353 uint32_t next_mcluster;
354 uint32_t next_moffset;
355 uint32_t ctrl_cluster;
356 uint32_t data_cluster;
357 uint32_t pregap_length;
358 uint32_t postgap_length;
359 int16_t gain;
360 bool locked;
361 threads::lock reflock;
362 unsigned refcount;
363 bool deleting;
366 opus_stream::opus_stream(uint64_t base, filesystem::ref filesys)
367 : fs(filesys)
369 refcount = 1;
370 deleting = false;
371 total_len = 0;
372 s_timebase = base;
373 locked = false;
374 next_cluster = 0;
375 next_mcluster = 0;
376 next_offset = 0;
377 next_moffset = 0;
378 ctrl_cluster = 0;
379 data_cluster = 0;
380 pregap_length = 0;
381 postgap_length = 0;
382 gain = 0;
385 opus_stream::opus_stream(uint64_t base, filesystem::ref filesys, uint32_t _ctrl_cluster,
386 uint32_t _data_cluster)
387 : fs(filesys)
389 refcount = 1;
390 deleting = false;
391 total_len = 0;
392 s_timebase = base;
393 locked = false;
394 next_cluster = data_cluster = _data_cluster;
395 next_mcluster = ctrl_cluster = _ctrl_cluster;
396 next_offset = 0;
397 next_moffset = 0;
398 pregap_length = 0;
399 postgap_length = 0;
400 gain = 0;
401 //Read the data buffers.
402 char buf[CLUSTER_SIZE];
403 uint32_t last_cluster_seen = next_mcluster;
404 uint64_t total_size = 0;
405 uint64_t total_frames = 0;
406 bool trailers = false;
407 bool saved_pointer_valid = false;
408 uint32_t saved_next_mcluster = 0;
409 uint32_t saved_next_moffset = 0;
410 while(true) {
411 last_cluster_seen = next_mcluster;
412 size_t r = fs.read_data(next_mcluster, next_moffset, buf, CLUSTER_SIZE);
413 if(!r) {
414 //The stream ends here.
415 break;
417 //Find the first unused entry if any.
418 for(unsigned i = 0; i < CLUSTER_SIZE; i += 4)
419 if(!buf[i + 3] || trailers) {
420 //This entry is unused. If the next entry is also unused, that is the end.
421 //Otherwise, there might be stream trailers.
422 if(trailers && !buf[i + 3]) {
423 goto out_parsing; //Ends for real.
425 if(!trailers) {
426 //Set the trailer flag and continue parsing.
427 //The saved offset must be placed here.
428 saved_next_mcluster = last_cluster_seen;
429 saved_next_moffset = i;
430 saved_pointer_valid = true;
431 trailers = true;
432 continue;
434 //This is a trailer entry.
435 if(buf[i + 3] == 2) {
436 //Pregap.
437 pregap_length = serialization::u32b(buf + i) >> 8;
438 } else if(buf[i + 3] == 3) {
439 //Postgap.
440 postgap_length = serialization::u32b(buf + i) >> 8;
441 } else if(buf[i + 3] == 4) {
442 //Gain.
443 gain = serialization::s16b(buf + i);
445 } else {
446 uint16_t psize = serialization::u16b(buf + i);
447 uint8_t plen = serialization::u8b(buf + i + 2);
448 total_size += psize;
449 total_len += 120 * plen;
450 opus_packetinfo p(psize, plen, 1ULL * next_cluster * CLUSTER_SIZE +
451 next_offset);
452 size_t r2 = fs.skip_data(next_cluster, next_offset, psize);
453 if(r2 < psize)
454 throw std::runtime_error("Incomplete data stream");
455 packets.push_back(p);
456 total_frames++;
459 out_parsing:
460 //If saved pointer is valid, restore to that.
461 if(saved_pointer_valid) {
462 next_mcluster = saved_next_mcluster;
463 next_moffset = saved_next_moffset;
467 opus_stream::opus_stream(uint64_t base, filesystem::ref filesys, std::ifstream& data,
468 voice_commentary::external_stream_format extfmt, settingvar::group& settings)
469 : fs(filesys)
471 refcount = 1;
472 deleting = false;
473 total_len = 0;
474 s_timebase = base;
475 locked = false;
476 next_cluster = 0;
477 next_mcluster = 0;
478 next_offset = 0;
479 next_moffset = 0;
480 ctrl_cluster = 0;
481 data_cluster = 0;
482 pregap_length = 0;
483 postgap_length = 0;
484 gain = 0;
485 if(extfmt == voice_commentary::EXTFMT_OGGOPUS)
486 import_stream_oggopus(data);
487 else if(extfmt == voice_commentary::EXTFMT_SOX)
488 import_stream_sox(data, settings);
491 void opus_stream::import_stream_oggopus(std::ifstream& data)
493 ogg::stream_reader_iostreams reader(data);
494 reader.set_errors_to(messages);
495 struct opus::ogg_header h;
496 struct opus::ogg_tags t;
497 ogg::page page;
498 ogg::demuxer d(messages);
499 int state = 0;
500 postgap_length = 0;
501 uint64_t datalen = 0;
502 uint64_t last_datalen = 0;
503 uint64_t last_granulepos = 0;
504 try {
505 while(true) {
506 ogg::packet p;
507 if(!d.wants_packet_out()) {
508 if(!reader.get_page(page))
509 break;
510 d.page_in(page);
511 continue;
512 } else
513 d.packet_out(p);
514 switch(state) {
515 case 0: //Not locked.
516 h.parse(p);
517 if(h.streams != 1)
518 throw std::runtime_error("Multistream OggOpus streams are not "
519 "supported");
520 state = 1; //Expecting comment.
521 pregap_length = h.preskip;
522 gain = h.gain;
523 break;
524 case 1: //Expecting comment.
525 t.parse(p);
526 state = 2; //Data page.
527 if(page.get_eos())
528 throw std::runtime_error("Empty OggOpus stream");
529 break;
530 case 2: //Data page.
531 case 3: //Data page.
532 const std::vector<uint8_t>& pkt = p.get_vector();
533 uint8_t tcnt = opus::packet_tick_count(&pkt[0], pkt.size());
534 if(tcnt) {
535 write(tcnt, &pkt[0], pkt.size());
536 datalen += tcnt * 120;
538 if(p.get_last_page()) {
539 uint64_t samples = p.get_granulepos() - last_granulepos;
540 if(samples > p.get_granulepos())
541 samples = 0;
542 uint64_t rsamples = datalen - last_datalen;
543 bool eos = p.get_on_eos_page();
544 if((samples > rsamples && (state == 3 || eos)) || (samples <
545 rsamples && !eos))
546 messages << "Warning: Granulepos says there are "
547 << samples << " samples, found " << rsamples
548 << std::endl;
549 last_datalen = datalen;
550 last_granulepos = p.get_granulepos();
551 if(p.get_on_eos_page()) {
552 if(samples < rsamples)
553 postgap_length = rsamples - samples;
554 state = 4;
555 goto out;
558 state = 3;
559 break;
562 out:
563 if(state == 0)
564 throw std::runtime_error("No OggOpus stream found");
565 if(state == 1)
566 throw std::runtime_error("Oggopus stream missing required tags pages");
567 if(state == 2 || state == 3)
568 messages << "Warning: Incomplete Oggopus stream." << std::endl;
569 if(datalen <= pregap_length)
570 throw std::runtime_error("Stream too short (entiere pregap not present)");
571 write_trailier();
572 } catch(...) {
573 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
574 if(data_cluster) fs.free_cluster_chain(data_cluster);
575 throw;
579 void opus_stream::import_stream_sox(std::ifstream& data, settingvar::group& settings)
581 bitrate_tracker brtrack;
582 unsigned char tmpi[65536];
583 float tmp[OPUS_MAX_OUT];
584 char header[260];
585 data.read(header, 32);
586 if(!data)
587 throw std::runtime_error("Can't read .sox header");
588 if(serialization::u32l(header + 0) != 0x586F532EULL)
589 throw std::runtime_error("Bad .sox header magic");
590 if(serialization::u8b(header + 4) > 28)
591 data.read(header + 32, serialization::u8b(header + 4) - 28);
592 if(!data)
593 throw std::runtime_error("Can't read .sox header");
594 if(serialization::u64l(header + 16) != 4676829883349860352ULL)
595 throw std::runtime_error("Bad .sox sampling rate");
596 if(serialization::u32l(header + 24) != 1)
597 throw std::runtime_error("Only mono streams are supported");
598 uint64_t samples = serialization::u64l(header + 8);
599 opus::encoder enc(opus::samplerate::r48k, false, opus::application::voice);
600 enc.ctl(opus::bitrate(SET_opus_bitrate(settings)));
601 int32_t pregap = enc.ctl(opus::lookahead);
602 pregap_length = pregap;
603 for(uint64_t i = 0; i < samples + pregap; i += OPUS_BLOCK_SIZE) {
604 size_t bs = OPUS_BLOCK_SIZE;
605 if(i + bs > samples + pregap)
606 bs = samples + pregap - i;
607 //We have to read zero bytes after the end of stream.
608 size_t readable = bs;
609 if(readable + i > samples)
610 readable = max(samples, i) - i;
611 if(readable > 0)
612 data.read(reinterpret_cast<char*>(tmpi), 4 * readable);
613 if(readable < bs)
614 memset(tmpi + 4 * readable, 0, 4 * (bs - readable));
615 if(!data) {
616 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
617 if(data_cluster) fs.free_cluster_chain(data_cluster);
618 throw std::runtime_error("Can't read .sox data");
620 for(size_t j = 0; j < bs; j++)
621 tmp[j] = static_cast<float>(serialization::s32l(tmpi + 4 * j)) / 268435456;
622 if(bs < OPUS_BLOCK_SIZE)
623 postgap_length = OPUS_BLOCK_SIZE - bs;
624 for(size_t j = bs; j < OPUS_BLOCK_SIZE; j++)
625 tmp[j] = 0;
626 try {
627 const size_t opus_out_max2 = SET_opus_max_bitrate(settings) *
628 OPUS_BLOCK_SIZE / 384000;
629 size_t r = enc.encode(tmp, OPUS_BLOCK_SIZE, tmpi, opus_out_max2);
630 write(OPUS_BLOCK_SIZE / 120, tmpi, r);
631 brtrack.submit(r, bs);
632 } catch(std::exception& e) {
633 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
634 if(data_cluster) fs.free_cluster_chain(data_cluster);
635 (stringfmt() << "Error encoding opus packet: " << e.what()).throwex();
638 messages << "Imported stream: " << brtrack;
639 try {
640 write_trailier();
641 } catch(...) {
642 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
643 if(data_cluster) fs.free_cluster_chain(data_cluster);
644 throw;
648 void opus_stream::destroy()
650 if(deleting) {
651 //We catch the errors and print em, because otherwise put_ref could throw, which would
652 //be too much.
653 try {
654 fs.free_cluster_chain(ctrl_cluster);
655 } catch(std::exception& e) {
656 messages << "Failed to delete stream control file: " << e.what();
658 try {
659 fs.free_cluster_chain(data_cluster);
660 } catch(std::exception& e) {
661 messages << "Failed to delete stream data file: " << e.what();
664 delete this;
667 void opus_stream::export_stream_oggopus(std::ofstream& data)
669 if(!packets.size())
670 throw std::runtime_error("Empty oggopus stream is not valid");
671 opus::ogg_header header;
672 opus::ogg_tags tags;
673 ogg::stream_writer_iostreams writer(data);
674 unsigned stream_id = 1;
675 uint64_t true_granule = 0;
676 uint32_t seq = 2;
677 //Headers / Tags.
678 header.version = 1;
679 header.channels = 1;
680 header.preskip = pregap_length;
681 header.rate = OPUS_SAMPLERATE;
682 header.gain = gain;
683 header.map_family = 0;
684 header.streams = 1;
685 header.coupled = 0;
686 header.chanmap[0] = 0;
687 memset(header.chanmap + 1, 255, 254);
688 tags.vendor = "unknown";
689 tags.comments.push_back((stringfmt() << "ENCODER=lsnes rr" + lsnes_version).str());
690 tags.comments.push_back((stringfmt() << "LSNES_STREAM_TS=" << s_timebase).str());
692 struct ogg::page hpage = header.serialize();
693 hpage.set_stream(stream_id);
694 writer.put_page(hpage);
695 seq = tags.serialize([&writer](const ogg::page& p) { writer.put_page(p); }, stream_id);
697 struct ogg::page ppage;
698 ogg::muxer mux(stream_id, seq);
699 for(size_t i = 0; i < packets.size(); i++) {
700 std::vector<unsigned char> p;
701 try {
702 p = packet(i);
703 } catch(std::exception& e) {
704 (stringfmt() << "Error reading opus packet: " << e.what()).throwex();
706 if(!p.size())
707 (stringfmt() << "Empty Opus packet is not valid").throwex();
708 uint32_t samples = static_cast<uint32_t>(opus::packet_tick_count(&p[0], p.size())) * 120;
709 if(i + 1 < packets.size())
710 true_granule += samples;
711 else
712 true_granule = max(true_granule, true_granule + samples - postgap_length);
713 if(!mux.wants_packet_in() || !mux.packet_fits(p.size()))
714 while(mux.has_page_out()) {
715 mux.page_out(ppage);
716 writer.put_page(ppage);
718 mux.packet_in(p, true_granule);
720 mux.signal_eos();
721 while(mux.has_page_out()) {
722 mux.page_out(ppage);
723 writer.put_page(ppage);
727 void opus_stream::export_stream_sox(std::ofstream& data)
729 opus::decoder dec(opus::samplerate::r48k, false);
730 std::vector<unsigned char> p;
731 float tmp[OPUS_MAX_OUT];
732 char header[32];
733 serialization::u64l(header, 0x1C586F532EULL); //Magic and header size.
734 serialization::u64l(header + 16, 4676829883349860352ULL); //Sampling rate.
735 serialization::u32l(header + 24, 1);
736 uint64_t tlen = 0;
737 uint32_t lookahead_thrown = 0;
738 data.write(header, 32);
739 if(!data)
740 throw std::runtime_error("Error writing PCM data.");
741 float lgain = get_gain_linear();
742 for(size_t i = 0; i < packets.size(); i++) {
743 char blank[4] = {0, 0, 0, 0};
744 try {
745 uint32_t pregap_throw = 0;
746 uint32_t postgap_throw = 0;
747 std::vector<unsigned char> p = packet(i);
748 uint32_t len = packet_length(i);
749 dec.decode(&p[0], p.size(), tmp, OPUS_MAX_OUT);
750 bool is_last = (i == packets.size() - 1);
751 if(lookahead_thrown < pregap_length) {
752 //We haven't yet thrown the full pregap. Throw some.
753 uint32_t maxthrow = pregap_length - lookahead_thrown;
754 pregap_throw = min(len, maxthrow);
755 lookahead_thrown += pregap_length;
757 if(is_last)
758 postgap_throw = min(len - pregap_throw, postgap_length);
759 tlen += (len - pregap_throw - postgap_throw);
760 for(uint32_t j = pregap_throw; j < len - postgap_throw; j++) {
761 int32_t s = (int32_t)(tmp[j] * lgain * 268435456.0);
762 serialization::s32l(blank, s);
763 data.write(blank, 4);
764 if(!data)
765 throw std::runtime_error("Error writing PCM data.");
767 } catch(std::exception& e) {
768 (stringfmt() << "Error decoding opus packet: " << e.what()).throwex();
771 data.seekp(0, std::ios_base::beg);
772 serialization::u64l(header + 8, tlen);
773 data.write(header, 32);
774 if(!data) {
775 throw std::runtime_error("Error writing PCM data.");
779 void opus_stream::export_stream(std::ofstream& data, voice_commentary::external_stream_format extfmt)
781 if(extfmt == voice_commentary::EXTFMT_OGGOPUS)
782 export_stream_oggopus(data);
783 else if(extfmt == voice_commentary::EXTFMT_SOX)
784 export_stream_sox(data);
787 void opus_stream::write(uint8_t len, const unsigned char* payload, size_t payload_len)
789 try {
790 char descriptor[4];
791 uint32_t used_cluster, used_offset;
792 uint32_t used_mcluster, used_moffset;
793 if(!next_cluster)
794 next_cluster = data_cluster = fs.allocate_cluster();
795 if(!next_mcluster)
796 next_mcluster = ctrl_cluster = fs.allocate_cluster();
797 serialization::u16b(descriptor, payload_len);
798 serialization::u8b(descriptor + 2, len);
799 serialization::u8b(descriptor + 3, 1);
800 fs.write_data(next_cluster, next_offset, payload, payload_len, used_cluster, used_offset);
801 fs.write_data(next_mcluster, next_moffset, descriptor, 4, used_mcluster, used_moffset);
802 uint64_t off = static_cast<uint64_t>(used_cluster) * CLUSTER_SIZE + used_offset;
803 opus_packetinfo p(payload_len, len, off);
804 total_len += p.length();
805 packets.push_back(p);
806 } catch(std::exception& e) {
807 (stringfmt() << "Can't write opus packet: " << e.what()).throwex();
811 void opus_stream::write_trailier()
813 try {
814 char descriptor[16];
815 uint32_t used_mcluster, used_moffset;
816 //The allocation must be done for real.
817 if(!next_mcluster)
818 next_mcluster = ctrl_cluster = fs.allocate_cluster();
819 //But the write must not update the pointers..
820 uint32_t tmp_mcluster = next_mcluster;
821 uint32_t tmp_moffset = next_moffset;
822 serialization::u32b(descriptor, 0);
823 serialization::u32b(descriptor + 4, (pregap_length << 8) | 0x02);
824 serialization::u32b(descriptor + 8, (postgap_length << 8) | 0x03);
825 serialization::s16b(descriptor + 12, gain);
826 serialization::u16b(descriptor + 14, 0x0004);
827 fs.write_data(tmp_mcluster, tmp_moffset, descriptor, 16, used_mcluster, used_moffset);
828 } catch(std::exception& e) {
829 (stringfmt() << "Can't write stream trailer: " << e.what()).throwex();
834 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
835 //Playing opus stream.
836 struct opus_playback_stream
838 //Create a new playing stream from given opus stream.
839 opus_playback_stream(opus_stream& data);
840 //Destroy playing opus stream.
841 ~opus_playback_stream();
842 //Read samples from stream.
843 //Can throw.
844 void read(float* data, size_t samples);
845 //Skip samples from stream.
846 //Can throw.
847 void skip(uint64_t samples);
848 //Has the stream already ended?
849 bool eof();
850 private:
851 opus_playback_stream(const opus_playback_stream&);
852 opus_playback_stream& operator=(const opus_playback_stream&);
853 //Can throw.
854 void decode_block();
855 float output[OPUS_MAX_OUT];
856 unsigned output_left;
857 uint32_t pregap_thrown;
858 bool postgap_thrown;
859 opus::decoder* decoder;
860 opus_stream& stream;
861 uint32_t next_block;
862 uint32_t blocks;
865 opus_playback_stream::opus_playback_stream(opus_stream& data)
866 : stream(data)
868 stream.get_ref();
869 stream.lock();
870 next_block = 0;
871 output_left = 0;
872 pregap_thrown = 0;
873 postgap_thrown = false;
874 blocks = stream.blocks();
875 decoder = new opus::decoder(opus::samplerate::r48k, false);
876 if(!decoder)
877 throw std::bad_alloc();
880 opus_playback_stream::~opus_playback_stream()
882 //No, we don't unlock the stream.
883 stream.put_ref();
884 delete decoder;
887 bool opus_playback_stream::eof()
889 return (next_block >= blocks && !output_left);
892 void opus_playback_stream::decode_block()
894 if(next_block >= blocks)
895 return;
896 if(output_left >= OPUS_MAX_OUT)
897 return;
898 unsigned plen = stream.packet_length(next_block);
899 if(plen + output_left > OPUS_MAX_OUT)
900 return;
901 std::vector<unsigned char> pdata = stream.packet(next_block);
902 try {
903 size_t c = decoder->decode(&pdata[0], pdata.size(), output + output_left,
904 OPUS_MAX_OUT - output_left);
905 output_left = min(output_left + c, static_cast<size_t>(OPUS_MAX_OUT));
906 } catch(...) {
907 //Bad packet, insert silence.
908 for(unsigned i = 0; i < plen; i++)
909 output[output_left++] = 0;
911 //Throw the pregap away if needed.
912 if(pregap_thrown < stream.get_pregap()) {
913 uint32_t throw_amt = min(stream.get_pregap() - pregap_thrown, (uint32_t)output_left);
914 if(throw_amt && throw_amt < output_left)
915 memmove(output, output + throw_amt, (output_left - throw_amt) * sizeof(float));
916 output_left -= throw_amt;
917 pregap_thrown += throw_amt;
919 next_block++;
922 void opus_playback_stream::read(float* data, size_t samples)
924 float lgain = stream.get_gain_linear();
925 while(samples > 0) {
926 decode_block();
927 if(next_block >= blocks && !postgap_thrown) {
928 //This is the final packet. Throw away postgap samples at the end.
929 uint32_t thrown = min(stream.get_postgap(), (uint32_t)output_left);
930 output_left -= thrown;
931 postgap_thrown = true;
933 if(next_block >= blocks && !output_left) {
934 //Zerofill remainder.
935 for(size_t i = 0; i < samples; i++)
936 data[i] = 0;
937 return;
939 unsigned maxcopy = min(static_cast<unsigned>(samples), output_left);
940 if(maxcopy) {
941 memcpy(data, output, maxcopy * sizeof(float));
942 for(size_t i = 0; i < maxcopy; i++)
943 data[i] *= lgain;
945 if(maxcopy < output_left && maxcopy)
946 memmove(output, output + maxcopy, (output_left - maxcopy) * sizeof(float));
947 output_left -= maxcopy;
948 samples -= maxcopy;
949 data += maxcopy;
953 void opus_playback_stream::skip(uint64_t samples)
955 //Adjust for preskip and declare all preskip already thrown away.
956 pregap_thrown = stream.get_pregap();
957 samples += pregap_thrown;
958 postgap_thrown = false;
959 //First, skip inside decoded samples.
960 if(samples < output_left) {
961 //Skipping less than amount in output buffer. Just discard from output buffer and try
962 //to decode a new block.
963 memmove(output, output + samples, (output_left - samples) * sizeof(float));
964 output_left -= samples;
965 decode_block();
966 return;
967 } else {
968 //Skipping at least the amount of samples in output buffer. First, blank the output buffer
969 //and count those towards samples discarded.
970 samples -= output_left;
971 output_left = 0;
973 //While number of samples is so great that adequate convergence period can be ensured without
974 //decoding this packet, just skip the samples from the packet.
975 while(samples > OPUS_CONVERGE_MAX) {
976 samples -= stream.packet_length(next_block++);
977 //Did we hit EOF?
978 if(next_block >= blocks)
979 return;
981 //Okay, we are near the point. Start decoding packets.
982 while(samples > 0) {
983 decode_block();
984 //Did we hit EOF?
985 if(next_block >= blocks && !output_left)
986 return;
987 //Skip as many samples as possible.
988 unsigned maxskip = min(static_cast<unsigned>(samples), output_left);
989 if(maxskip < output_left)
990 memmove(output, output + maxskip, (output_left - maxskip) * sizeof(float));
991 output_left -= maxskip;
992 samples -= maxskip;
994 //Just to be nice, decode a extra block.
995 decode_block();
999 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1000 //Collection of streams.
1001 struct stream_collection
1003 public:
1004 //Create a new collection.
1005 //Can throw.
1006 stream_collection(filesystem::ref filesys);
1007 //Destroy a collection. All streams are destroyed but not deleted.
1008 ~stream_collection();
1009 //Get list of streams active at given point.
1010 std::list<uint64_t> streams_at(uint64_t point);
1011 //Add a stream into collection.
1012 //Can throw.
1013 uint64_t add_stream(opus_stream& stream);
1014 //Get the filesystem this collection is for.
1015 filesystem::ref get_filesystem() { return fs; }
1016 //Unlock all streams in collection.
1017 void unlock_all();
1018 //Get stream with given index (NULL if not found).
1019 opus_stream* get_stream(uint64_t index)
1021 threads::alock m(mlock);
1022 if(streams.count(index)) {
1023 streams[index]->get_ref();
1024 return streams[index];
1026 return NULL;
1028 //Delete a stream.
1029 //Can throw.
1030 void delete_stream(uint64_t index);
1031 //Alter stream timebase.
1032 //Can throw.
1033 void alter_stream_timebase(uint64_t index, uint64_t newts);
1034 //Alter stream gain.
1035 void alter_stream_gain(uint64_t index, uint16_t newgain);
1036 //Enumerate all valid stream indices, in time order.
1037 std::list<uint64_t> all_streams();
1038 //Export the entiere superstream.
1039 //Can throw.
1040 void export_superstream(std::ofstream& out);
1041 private:
1042 filesystem::ref fs;
1043 uint64_t next_index;
1044 unsigned next_stream;
1045 threads::lock mlock;
1046 std::set<uint64_t> free_indices;
1047 std::map<uint64_t, uint64_t> entries;
1048 std::multimap<uint64_t, uint64_t> streams_by_time;
1049 //FIXME: Something more efficient.
1050 std::map<uint64_t, opus_stream*> streams;
1053 stream_collection::stream_collection(filesystem::ref filesys)
1054 : fs(filesys)
1056 next_stream = 0;
1057 next_index = 0;
1058 //The stream index table is in cluster 2.
1059 uint32_t next_cluster = 2;
1060 uint32_t next_offset = 0;
1061 uint32_t i = 0;
1062 try {
1063 while(true) {
1064 char buffer[16];
1065 size_t r = fs.read_data(next_cluster, next_offset, buffer, 16);
1066 if(r < 16)
1067 break;
1068 uint64_t timebase = serialization::u64b(buffer);
1069 uint32_t ctrl_cluster = serialization::u32b(buffer + 8);
1070 uint32_t data_cluster = serialization::u32b(buffer + 12);
1071 if(ctrl_cluster) {
1072 opus_stream* x = new opus_stream(timebase, fs, ctrl_cluster, data_cluster);
1073 entries[next_index] = i;
1074 streams_by_time.insert(std::make_pair(timebase, next_index));
1075 streams[next_index++] = x;
1076 } else
1077 free_indices.insert(i);
1078 next_stream = ++i;
1080 } catch(std::exception& e) {
1081 for(auto i : streams)
1082 i.second->put_ref();
1083 (stringfmt() << "Failed to parse LSVS: " << e.what()).throwex();
1087 stream_collection::~stream_collection()
1089 threads::alock m(mlock);
1090 for(auto i : streams)
1091 i.second->put_ref();
1092 streams.clear();
1095 std::list<uint64_t> stream_collection::streams_at(uint64_t point)
1097 threads::alock m(mlock);
1098 std::list<uint64_t> s;
1099 for(auto i : streams) {
1100 uint64_t start = i.second->timebase();
1101 uint64_t end = start + i.second->length();
1102 if(point >= start && point < end) {
1103 i.second->get_ref();
1104 s.push_back(i.first);
1107 return s;
1110 uint64_t stream_collection::add_stream(opus_stream& stream)
1112 uint64_t idx;
1113 try {
1114 threads::alock m(mlock);
1115 //Lock the added stream so it doesn't start playing back immediately.
1116 stream.lock();
1117 idx = next_index++;
1118 streams[idx] = &stream;
1119 char buffer[16];
1120 serialization::u64b(buffer, stream.timebase());
1121 auto r = stream.get_clusters();
1122 serialization::u32b(buffer + 8, r.first);
1123 serialization::u32b(buffer + 12, r.second);
1124 uint64_t entry_number = 0;
1125 if(free_indices.empty())
1126 entry_number = next_stream++;
1127 else {
1128 entry_number = *free_indices.begin();
1129 free_indices.erase(entry_number);
1131 uint32_t write_cluster = 2;
1132 uint32_t write_offset = 0;
1133 uint32_t dummy1, dummy2;
1134 fs.skip_data(write_cluster, write_offset, 16 * entry_number);
1135 fs.write_data(write_cluster, write_offset, buffer, 16, dummy1, dummy2);
1136 streams_by_time.insert(std::make_pair(stream.timebase(), idx));
1137 entries[idx] = entry_number;
1138 return idx;
1139 } catch(std::exception& e) {
1140 (stringfmt() << "Failed to add stream: " << e.what()).throwex();
1142 return idx;
1145 void stream_collection::unlock_all()
1147 threads::alock m(mlock);
1148 for(auto i : streams)
1149 i.second->unlock();
1152 void stream_collection::delete_stream(uint64_t index)
1154 threads::alock m(mlock);
1155 if(!entries.count(index))
1156 return;
1157 uint64_t entry_number = entries[index];
1158 uint32_t write_cluster = 2;
1159 uint32_t write_offset = 0;
1160 uint32_t dummy1, dummy2;
1161 char buffer[16] = {0};
1162 fs.skip_data(write_cluster, write_offset, 16 * entry_number);
1163 fs.write_data(write_cluster, write_offset, buffer, 16, dummy1, dummy2);
1164 auto itr = streams_by_time.lower_bound(streams[index]->timebase());
1165 auto itr2 = streams_by_time.upper_bound(streams[index]->timebase());
1166 for(auto x = itr; x != itr2; x++)
1167 if(x->second == index) {
1168 streams_by_time.erase(x);
1169 break;
1171 streams[index]->delete_stream();
1172 streams.erase(index);
1175 void stream_collection::alter_stream_timebase(uint64_t index, uint64_t newts)
1177 try {
1178 threads::alock m(mlock);
1179 if(!streams.count(index))
1180 return;
1181 if(entries.count(index)) {
1182 char buffer[8];
1183 uint32_t write_cluster = 2;
1184 uint32_t write_offset = 0;
1185 uint32_t dummy1, dummy2;
1186 serialization::u64b(buffer, newts);
1187 fs.skip_data(write_cluster, write_offset, 16 * entries[index]);
1188 fs.write_data(write_cluster, write_offset, buffer, 8, dummy1, dummy2);
1190 auto itr = streams_by_time.lower_bound(streams[index]->timebase());
1191 auto itr2 = streams_by_time.upper_bound(streams[index]->timebase());
1192 for(auto x = itr; x != itr2; x++)
1193 if(x->second == index) {
1194 streams_by_time.erase(x);
1195 break;
1197 streams[index]->timebase(newts);
1198 streams_by_time.insert(std::make_pair(newts, index));
1199 } catch(std::exception& e) {
1200 (stringfmt() << "Failed to alter stream timebase: " << e.what()).throwex();
1204 void stream_collection::alter_stream_gain(uint64_t index, uint16_t newgain)
1206 try {
1207 threads::alock m(mlock);
1208 if(!streams.count(index))
1209 return;
1210 streams[index]->set_gain(newgain);
1211 streams[index]->write_trailier();
1212 } catch(std::exception& e) {
1213 (stringfmt() << "Failed to alter stream gain: " << e.what()).throwex();
1217 std::list<uint64_t> stream_collection::all_streams()
1219 threads::alock m(mlock);
1220 std::list<uint64_t> s;
1221 for(auto i : streams_by_time)
1222 s.push_back(i.second);
1223 return s;
1226 void stream_collection::export_superstream(std::ofstream& out)
1228 std::list<uint64_t> slist = all_streams();
1229 //Find the total length of superstream.
1230 uint64_t len = 0;
1231 for(auto i : slist) {
1232 opus_stream* s = get_stream(i);
1233 if(s) {
1234 len = max(len, s->timebase() + s->length());
1235 s->put_ref();
1238 char header[32];
1239 serialization::u64l(header, 0x1C586F532EULL); //Magic and header size.
1240 serialization::u64l(header + 8, len);
1241 serialization::u64l(header + 16, 4676829883349860352ULL); //Sampling rate.
1242 serialization::u64l(header + 24, 1);
1243 out.write(header, 32);
1244 if(!out)
1245 throw std::runtime_error("Error writing PCM output");
1247 //Find the first valid stream.
1248 auto next_i = slist.begin();
1249 opus_stream* next_stream = NULL;
1250 while(next_i != slist.end()) {
1251 next_stream = get_stream(*next_i);
1252 next_i++;
1253 if(next_stream)
1254 break;
1256 uint64_t next_ts;
1257 next_ts = next_stream ? next_stream->timebase() : len;
1259 std::list<opus_playback_stream*> active;
1260 try {
1261 for(uint64_t s = 0; s < len;) {
1262 if(s == next_ts) {
1263 active.push_back(new opus_playback_stream(*next_stream));
1264 next_stream->put_ref();
1265 next_stream = NULL;
1266 while(next_i != slist.end()) {
1267 next_stream = get_stream(*next_i);
1268 next_i++;
1269 if(!next_stream)
1270 continue;
1271 uint64_t next_ts = next_stream->timebase();
1272 if(next_ts > s)
1273 break;
1274 //Okay, this starts too...
1275 active.push_back(new opus_playback_stream(*next_stream));
1276 next_stream->put_ref();
1277 next_stream = NULL;
1279 next_ts = next_stream ? next_stream->timebase() : len;
1281 uint64_t maxsamples = min(next_ts - s, static_cast<uint64_t>(OUTPUT_BLOCK));
1282 maxsamples = min(maxsamples, len - s);
1283 char outbuf[4 * OUTPUT_BLOCK];
1284 float buf1[OUTPUT_BLOCK];
1285 float buf2[OUTPUT_BLOCK];
1286 for(size_t t = 0; t < maxsamples; t++)
1287 buf1[t] = 0;
1288 for(auto t : active) {
1289 t->read(buf2, maxsamples);
1290 for(size_t u = 0; u < maxsamples; u++)
1291 buf1[u] += buf2[u];
1293 for(auto t = active.begin(); t != active.end();) {
1294 if((*t)->eof()) {
1295 auto todel = t;
1296 t++;
1297 delete *todel;
1298 active.erase(todel);
1299 } else
1300 t++;
1302 for(size_t t = 0; t < maxsamples; t++)
1303 serialization::s32l(outbuf + 4 * t, buf1[t] * 268435456);
1304 out.write(outbuf, 4 * maxsamples);
1305 if(!out)
1306 throw std::runtime_error("Failed to write PCM");
1307 s += maxsamples;
1309 } catch(std::exception& e) {
1310 (stringfmt() << "Failed to export PCM: " << e.what()).throwex();
1312 for(auto t = active.begin(); t != active.end();) {
1313 if((*t)->eof()) {
1314 auto todelete = t;
1315 t++;
1316 delete *todelete;
1317 active.erase(todelete);
1318 } else
1319 t++;
1323 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1324 void voicesub_state::start_management_stream(opus_stream& s)
1326 opus_playback_stream* p = new opus_playback_stream(s);
1327 threads::alock m(active_playback_streams_lock);
1328 active_playback_streams.push_back(p);
1331 void voicesub_state::advance_time(uint64_t newtime)
1333 threads::alock m2(current_collection_lock);
1334 if(!current_collection) {
1335 //Clear all.
1336 threads::alock m(active_playback_streams_lock);
1337 for(auto i : active_playback_streams)
1338 delete i;
1339 active_playback_streams.clear();
1340 return;
1342 std::list<uint64_t> sactive = current_collection->streams_at(newtime);
1343 for(auto j : sactive) {
1344 opus_stream* i = current_collection->get_stream(j);
1345 if(!i)
1346 continue;
1347 //Don't play locked streams in order to avoid double playing.
1348 threads::alock m(active_playback_streams_lock);
1349 try {
1350 if(!i->islocked())
1351 active_playback_streams.push_back(new opus_playback_stream(*i));
1352 } catch(std::exception& e) {
1353 messages << "Can't start stream: " << e.what() << std::endl;
1355 i->put_ref();
1359 void voicesub_state::jump_time(uint64_t newtime)
1361 threads::alock m2(current_collection_lock);
1362 if(!current_collection) {
1363 //Clear all.
1364 threads::alock m(active_playback_streams_lock);
1365 for(auto i : active_playback_streams)
1366 delete i;
1367 active_playback_streams.clear();
1368 return;
1370 //Close all currently playing streams.
1372 threads::alock m(active_playback_streams_lock);
1373 for(auto i : active_playback_streams)
1374 delete i;
1375 active_playback_streams.clear();
1377 //Unlock all streams, so they will play.
1378 current_collection->unlock_all();
1379 //Reopen all streams that should be open (with seeking)
1380 std::list<uint64_t> sactive = current_collection->streams_at(newtime);
1381 for(auto j : sactive) {
1382 opus_stream* i = current_collection->get_stream(j);
1383 if(!i)
1384 continue;
1385 //No need to check for locks, because we just busted all of those.
1386 uint64_t p = newtime - i->timebase();
1387 opus_playback_stream* s;
1388 try {
1389 s = new opus_playback_stream(*i);
1390 } catch(std::exception& e) {
1391 messages << "Can't start stream: " << e.what() << std::endl;
1392 return;
1394 i->put_ref();
1395 if(!s)
1396 continue;
1397 s->skip(p);
1398 threads::alock m(active_playback_streams_lock);
1399 active_playback_streams.push_back(s);
1403 //Resample.
1404 void voicesub_state::do_resample(audioapi_instance::resampler& r, float* srcbuf, size_t& srcuse,
1405 float* dstbuf, size_t& dstuse, size_t dstmax, double ratio)
1407 if(srcuse == 0 || dstuse >= dstmax)
1408 return;
1409 float* in = srcbuf;
1410 size_t in_u = srcuse;
1411 float* out = dstbuf + dstuse;
1412 size_t out_u = dstmax - dstuse;
1413 r.resample(in, in_u, out, out_u, ratio, false);
1414 size_t offset = in - srcbuf;
1415 if(offset < srcuse)
1416 memmove(srcbuf, srcbuf + offset, sizeof(float) * (srcuse - offset));
1417 srcuse -= offset;
1418 dstuse = dstmax - out_u;
1421 //Drain the input buffer.
1422 void voicesub_state::drain_input()
1424 while(audio.voice_r_status() > 0) {
1425 float buf[256];
1426 unsigned size = min(audio.voice_r_status(), 256u);
1427 audio.record_voice(buf, size);
1431 //Read the input buffer.
1432 void voicesub_state::read_input(float* buf, size_t& use, size_t maxuse)
1434 size_t rleft = audio.voice_r_status();
1435 unsigned toread = min(rleft, max(maxuse, use) - use);
1436 if(toread > 0) {
1437 audio.record_voice(buf + use, toread);
1438 use += toread;
1442 //Compress Opus block.
1443 void voicesub_state::compress_opus_block(opus::encoder& e, float* buf, size_t& use,
1444 opus_stream& active_stream, bitrate_tracker& brtrack)
1446 const size_t opus_out_max = 1276;
1447 unsigned char opus_output[opus_out_max];
1448 size_t cblock = 0;
1449 if(use >= 960)
1450 cblock = 960;
1451 else if(use >= 480)
1452 cblock = 480;
1453 else if(use >= 240)
1454 cblock = 240;
1455 else if(use >= 120)
1456 cblock = 120;
1457 else
1458 return; //No valid data to compress.
1459 const size_t opus_out_max2 = SET_opus_max_bitrate(settings) * cblock / 384000;
1460 try {
1461 size_t c = e.encode(buf, cblock, opus_output, opus_out_max2);
1462 //Successfully compressed a block.
1463 size_t opus_output_len = c;
1464 brtrack.submit(c, cblock);
1465 try {
1466 active_stream.write(cblock / 120, opus_output, opus_output_len);
1467 } catch(std::exception& e) {
1468 messages << "Error writing data: " << e.what() << std::endl;
1470 } catch(std::exception& e) {
1471 messages << "Opus encoder error: " << e.what() << std::endl;
1473 use -= cblock;
1476 void voicesub_state::update_time()
1478 uint64_t sampletime = 0;
1479 bool jumping = false;
1481 threads::alock m(time_mutex);
1482 sampletime = current_time;
1483 jumping = time_jump;
1484 time_jump = false;
1486 if(jumping)
1487 jump_time(sampletime);
1488 else
1489 advance_time(sampletime);
1492 void voicesub_state::decompress_active_streams(float* out, size_t& use)
1494 size_t base = use;
1495 use += OUTPUT_BLOCK;
1496 for(unsigned i = 0; i < OUTPUT_BLOCK; i++)
1497 out[i + base] = 0;
1498 //Do it this way to minimize the amount of time playback streams lock
1499 //is held.
1500 std::list<opus_playback_stream*> stmp;
1502 threads::alock m(active_playback_streams_lock);
1503 stmp = active_playback_streams;
1505 std::set<opus_playback_stream*> toerase;
1506 for(auto i : stmp) {
1507 float tmp[OUTPUT_BLOCK];
1508 try {
1509 i->read(tmp, OUTPUT_BLOCK);
1510 } catch(std::exception& e) {
1511 messages << "Failed to decompress: " << e.what() << std::endl;
1512 for(unsigned j = 0; j < OUTPUT_BLOCK; j++)
1513 tmp[j] = 0;
1515 for(unsigned j = 0; j < OUTPUT_BLOCK; j++)
1516 out[j + base] += tmp[j];
1517 if(i->eof())
1518 toerase.insert(i);
1521 threads::alock m(active_playback_streams_lock);
1522 for(auto i = active_playback_streams.begin(); i != active_playback_streams.end();) {
1523 if(toerase.count(*i)) {
1524 auto toerase = i;
1525 i++;
1526 delete *toerase;
1527 active_playback_streams.erase(toerase);
1528 } else
1529 i++;
1534 void voicesub_state::handle_tangent_positive_edge(opus::encoder& e, opus_stream*& active_stream,
1535 bitrate_tracker& brtrack)
1537 threads::alock m2(current_collection_lock);
1538 if(!current_collection) {
1539 messages << "No file to save stream set" << std::endl;
1540 return;
1542 try {
1543 e.ctl(opus::reset);
1544 e.ctl(opus::bitrate(SET_opus_bitrate(settings)));
1545 brtrack.reset();
1546 uint64_t ctime;
1548 threads::alock m(time_mutex);
1549 ctime = current_time;
1551 active_stream = NULL;
1552 active_stream = new opus_stream(ctime, current_collection->get_filesystem());
1553 int32_t pregap = e.ctl(opus::lookahead);
1554 active_stream->set_pregap(pregap);
1555 } catch(std::exception& e) {
1556 messages << "Can't start stream: " << e.what() << std::endl;
1557 return;
1559 messages << "Tangent enaged." << std::endl;
1562 void voicesub_state::handle_tangent_negative_edge(opus_stream*& active_stream, bitrate_tracker& brtrack)
1564 threads::alock m2(current_collection_lock);
1565 messages << "Tangent disenaged: " << brtrack;
1566 try {
1567 active_stream->write_trailier();
1568 } catch(std::exception& e) {
1569 messages << e.what() << std::endl;
1571 if(current_collection) {
1572 try {
1573 current_collection->add_stream(*active_stream);
1574 } catch(std::exception& e) {
1575 messages << "Can't add stream: " << e.what() << std::endl;
1576 active_stream->put_ref();
1578 edispatch.voice_stream_change();
1579 } else
1580 active_stream->put_ref();
1581 active_stream = NULL;
1584 class inthread_th : public workthread
1586 public:
1587 inthread_th(voicesub_state* _internal, audioapi_instance& _audio)
1588 : internal(*_internal), audio(_audio)
1590 quit = false;
1591 quit_ack = false;
1592 rptr = 0;
1593 fire();
1595 void kill()
1597 quit = true;
1599 threads::alock h(lmut);
1600 lcond.notify_all();
1602 while(!quit_ack)
1603 usleep(100000);
1604 usleep(100000);
1606 protected:
1607 void entry()
1609 try {
1610 entry2();
1611 } catch(std::bad_alloc& e) {
1612 OOM_panic();
1613 } catch(std::exception& e) {
1614 messages << "AIEEE... Fatal exception in voice thread: " << e.what() << std::endl;
1616 quit_ack = true;
1618 void entry2()
1620 //Wait for libopus to load...
1621 size_t cbh = opus::add_callback([this]() {
1622 threads::alock h(this->lmut);
1623 this->lcond.notify_all();
1625 while(true) {
1626 threads::alock h(lmut);
1627 if(opus::libopus_loaded() || quit)
1628 break;
1629 lcond.wait(h);
1631 opus::cancel_callback(cbh);
1632 if(quit)
1633 return;
1635 opus::encoder oenc(opus::samplerate::r48k, false, opus::application::voice);
1636 oenc.ctl(opus::bitrate(SET_opus_bitrate(internal.settings)));
1637 audioapi_instance::resampler rin;
1638 audioapi_instance::resampler rout;
1639 const unsigned buf_max = 6144; //These buffers better be large.
1640 size_t buf_in_use = 0;
1641 size_t buf_inr_use = 0;
1642 size_t buf_outr_use = 0;
1643 size_t buf_out_use = 0;
1644 float buf_in[buf_max];
1645 float buf_inr[OPUS_BLOCK_SIZE];
1646 float buf_outr[OUTPUT_SIZE];
1647 float buf_out[buf_max];
1648 bitrate_tracker brtrack;
1649 opus_stream* active_stream = NULL;
1651 internal.drain_input();
1652 while(1) {
1653 if(clear_workflag(workthread::quit_request) & workthread::quit_request) {
1654 if(!internal.active_flag && active_stream)
1655 internal.handle_tangent_negative_edge(active_stream, brtrack);
1656 break;
1658 uint64_t ticks = framerate_regulator::get_utime();
1659 //Handle tangent edges.
1660 if(internal.active_flag && !active_stream) {
1661 internal.drain_input();
1662 buf_in_use = 0;
1663 buf_inr_use = 0;
1664 internal.handle_tangent_positive_edge(oenc, active_stream, brtrack);
1665 //If stream didn't start, autodrop the activity flag.
1666 if(!active_stream)
1667 internal.active_flag = false;
1669 else if((!internal.active_flag || quit) && active_stream)
1670 internal.handle_tangent_negative_edge(active_stream, brtrack);
1671 if(quit)
1672 break;
1674 //Read input, up to 25ms.
1675 unsigned rate_in = audio.voice_rate().first;
1676 unsigned rate_out = audio.voice_rate().second;
1677 size_t dbuf_max = min(buf_max, rate_in / REC_THRESHOLD_DIV);
1678 internal.read_input(buf_in, buf_in_use, dbuf_max);
1680 //Contribute some entropy.
1681 contribute_random_entropy(buf_in, buf_in_use * sizeof(float));
1683 //Resample up to full opus block.
1684 internal.do_resample(rin, buf_in, buf_in_use, buf_inr, buf_inr_use, OPUS_BLOCK_SIZE,
1685 1.0 * OPUS_SAMPLERATE / rate_in);
1687 //If we have full opus block and recording is enabled, compress it.
1688 if(buf_inr_use >= OPUS_BLOCK_SIZE && active_stream)
1689 internal.compress_opus_block(oenc, buf_inr, buf_inr_use, *active_stream,
1690 brtrack);
1692 //Update time, starting/ending streams.
1693 internal.update_time();
1695 //Decompress active streams.
1696 if(buf_outr_use < BLOCK_THRESHOLD)
1697 internal.decompress_active_streams(buf_outr, buf_outr_use);
1699 //Resample to output rate.
1700 internal.do_resample(rout, buf_outr, buf_outr_use, buf_out, buf_out_use, buf_max,
1701 1.0 * rate_out / OPUS_SAMPLERATE);
1703 //Output stuff.
1704 if(buf_out_use > 0 && audio.voice_p_status2() < rate_out / PLAY_THRESHOLD_DIV) {
1705 audio.play_voice(buf_out, buf_out_use);
1706 buf_out_use = 0;
1709 //Sleep a bit to save CPU use.
1710 uint64_t ticks_spent = framerate_regulator::get_utime() - ticks;
1711 if(ticks_spent < ITERATION_TIME)
1712 usleep(ITERATION_TIME - ticks_spent);
1714 threads::alock h(internal.current_collection_lock);
1715 delete internal.current_collection;
1716 internal.current_collection = NULL;
1718 private:
1719 size_t rptr;
1720 double position;
1721 volatile bool quit;
1722 volatile bool quit_ack;
1723 threads::lock lmut;
1724 threads::cv lcond;
1725 voicesub_state& internal;
1726 audioapi_instance& audio;
1730 voice_commentary::voice_commentary(settingvar::group& _settings, emulator_dispatch& _dispatch,
1731 audioapi_instance& _audio, command::group& _cmd)
1732 : settings(_settings), edispatch(_dispatch), audio(_audio), cmd(_cmd),
1733 tangentp(cmd, CCOMMENTARY::p, [this]() { this->set_active_flag(true); }),
1734 tangentr(cmd, CCOMMENTARY::r, [this]() { this->set_active_flag(false); })
1736 internal = NULL;
1739 voice_commentary::~voice_commentary()
1741 if(internal)
1742 kill();
1745 //Rate is not sampling rate!
1746 void voice_commentary::frame_number(uint64_t newframe, double rate)
1748 if(!internal)
1749 return;
1750 auto _internal = get_state(internal);
1751 if(rate == _internal->last_rate && _internal->last_frame_number == newframe)
1752 return;
1753 threads::alock m(_internal->time_mutex);
1754 _internal->current_time = newframe / rate * OPUS_SAMPLERATE;
1755 if(fabs(rate - _internal->last_rate) > 1e-6 || _internal->last_frame_number + 1 != newframe)
1756 _internal->time_jump = true;
1757 _internal->last_frame_number = newframe;
1758 _internal->last_rate = rate;
1761 void voice_commentary::init()
1763 internal = new voicesub_state(settings, edispatch, audio);
1764 auto _internal = get_state(internal);
1765 try {
1766 _internal->int_task = new inthread_th(_internal, audio);
1767 } catch(...) {
1768 delete _internal;
1769 throw;
1773 void voice_commentary::kill()
1775 auto _internal = get_state(internal);
1776 _internal->int_task->kill();
1777 delete _internal->int_task;
1778 _internal->int_task = NULL;
1779 delete _internal;
1780 internal = NULL;
1783 uint64_t voice_commentary::parse_timebase(const std::string& n)
1785 std::string x = n;
1786 if(x.length() > 0 && x[x.length() - 1] == 's') {
1787 x = x.substr(0, x.length() - 1);
1788 return 48000 * parse_value<double>(x);
1789 } else
1790 return parse_value<uint64_t>(x);
1793 bool voice_commentary::collection_loaded()
1795 if(!internal) return false;
1796 auto _internal = get_state(internal);
1797 threads::alock m2(_internal->current_collection_lock);
1798 return (_internal->current_collection != NULL);
1801 std::list<voice_commentary::playback_stream_info> voice_commentary::get_stream_info()
1803 std::list<voice_commentary::playback_stream_info> in;
1804 if(!internal)
1805 return in;
1806 auto _internal = get_state(internal);
1807 threads::alock m2(_internal->current_collection_lock);
1808 if(!_internal->current_collection)
1809 return in;
1810 for(auto i : _internal->current_collection->all_streams()) {
1811 opus_stream* s = _internal->current_collection->get_stream(i);
1812 voice_commentary::playback_stream_info pi;
1813 if(!s)
1814 continue;
1815 pi.id = i;
1816 pi.base = s->timebase();
1817 pi.length = s->length();
1818 try {
1819 in.push_back(pi);
1820 } catch(...) {
1822 s->put_ref();
1824 return in;
1827 void voice_commentary::play_stream(uint64_t id)
1829 auto _internal = get_state(internal);
1830 threads::alock m2(_internal->current_collection_lock);
1831 if(!_internal->current_collection)
1832 throw std::runtime_error("No collection loaded");
1833 opus_stream* s = _internal->current_collection->get_stream(id);
1834 if(!s)
1835 return;
1836 try {
1837 _internal->start_management_stream(*s);
1838 } catch(...) {
1839 s->put_ref();
1840 throw;
1842 s->put_ref();
1845 void voice_commentary::export_stream(uint64_t id, const std::string& filename,
1846 voice_commentary::external_stream_format fmt)
1848 auto _internal = get_state(internal);
1849 threads::alock m2(_internal->current_collection_lock);
1850 if(!_internal->current_collection)
1851 throw std::runtime_error("No collection loaded");
1852 opus_stream* st = _internal->current_collection->get_stream(id);
1853 if(!st)
1854 return;
1855 std::ofstream s(filename, std::ios_base::out | std::ios_base::binary);
1856 if(!s) {
1857 st->put_ref();
1858 throw std::runtime_error("Can't open output file");
1860 try {
1861 st->export_stream(s, fmt);
1862 } catch(std::exception& e) {
1863 st->put_ref();
1864 (stringfmt() << "Export failed: " << e.what()).throwex();
1866 st->put_ref();
1869 uint64_t voice_commentary::import_stream(uint64_t ts, const std::string& filename,
1870 voice_commentary::external_stream_format fmt)
1872 auto _internal = get_state(internal);
1873 threads::alock m2(_internal->current_collection_lock);
1874 if(!_internal->current_collection)
1875 throw std::runtime_error("No collection loaded");
1877 std::ifstream s(filename, std::ios_base::in | std::ios_base::binary);
1878 if(!s)
1879 throw std::runtime_error("Can't open input file");
1880 opus_stream* st = new opus_stream(ts, _internal->current_collection->get_filesystem(), s, fmt, settings);
1881 uint64_t id;
1882 try {
1883 id = _internal->current_collection->add_stream(*st);
1884 } catch(...) {
1885 st->delete_stream();
1886 throw;
1888 st->unlock(); //Not locked.
1889 edispatch.voice_stream_change();
1890 return id;
1893 void voice_commentary::delete_stream(uint64_t id)
1895 auto _internal = get_state(internal);
1896 threads::alock m2(_internal->current_collection_lock);
1897 if(!_internal->current_collection)
1898 throw std::runtime_error("No collection loaded");
1899 _internal->current_collection->delete_stream(id);
1900 edispatch.voice_stream_change();
1903 void voice_commentary::export_superstream(const std::string& filename)
1905 auto _internal = get_state(internal);
1906 threads::alock m2(_internal->current_collection_lock);
1907 if(!_internal->current_collection)
1908 throw std::runtime_error("No collection loaded");
1909 std::ofstream s(filename, std::ios_base::out | std::ios_base::binary);
1910 if(!s)
1911 throw std::runtime_error("Can't open output file");
1912 _internal->current_collection->export_superstream(s);
1915 void voice_commentary::load_collection(const std::string& filename)
1917 auto _internal = get_state(internal);
1918 threads::alock m2(_internal->current_collection_lock);
1919 filesystem::ref newfs;
1920 stream_collection* newc;
1921 newfs = filesystem::ref(filename);
1922 newc = new stream_collection(newfs);
1923 if(_internal->current_collection)
1924 delete _internal->current_collection;
1925 _internal->current_collection = newc;
1926 edispatch.voice_stream_change();
1929 void voice_commentary::unload_collection()
1931 if(!internal) return;
1932 auto _internal = get_state(internal);
1933 threads::alock m2(_internal->current_collection_lock);
1934 if(_internal->current_collection)
1935 delete _internal->current_collection;
1936 _internal->current_collection = NULL;
1937 edispatch.voice_stream_change();
1940 void voice_commentary::alter_timebase(uint64_t id, uint64_t ts)
1942 auto _internal = get_state(internal);
1943 threads::alock m2(_internal->current_collection_lock);
1944 if(!_internal->current_collection)
1945 throw std::runtime_error("No collection loaded");
1946 _internal->current_collection->alter_stream_timebase(id, ts);
1947 edispatch.voice_stream_change();
1950 float voice_commentary::get_gain(uint64_t id)
1952 auto _internal = get_state(internal);
1953 threads::alock m2(_internal->current_collection_lock);
1954 if(!_internal->current_collection)
1955 throw std::runtime_error("No collection loaded");
1956 return _internal->current_collection->get_stream(id)->get_gain() / 256.0;
1959 void voice_commentary::set_gain(uint64_t id, float gain)
1961 auto _internal = get_state(internal);
1962 threads::alock m2(_internal->current_collection_lock);
1963 if(!_internal->current_collection)
1964 throw std::runtime_error("No collection loaded");
1965 int64_t _gain = gain * 256;
1966 if(_gain < -32768 || _gain > 32767)
1967 throw std::runtime_error("Gain out of range (+-128dB)");
1968 _internal->current_collection->alter_stream_gain(id, _gain);
1969 edispatch.voice_stream_change();
1972 double voice_commentary::ts_seconds(uint64_t ts)
1974 return ts / 48000.0;
1977 void voice_commentary::set_active_flag(bool flag)
1979 if(!internal) return;
1980 auto _internal = get_state(internal);
1981 _internal->active_flag = flag;