Tweak format of command help files and do some further command cleanup
[lsnes.git] / src / core / inthread.cpp
blobb910aeb85354801d35970cb656d70dc551f52562
1 #include "lsnes.hpp"
3 #include "cmdhelp/commentary.hpp"
4 #include "core/audioapi.hpp"
5 #include "core/command.hpp"
6 #include "core/dispatch.hpp"
7 #include "core/framerate.hpp"
8 #include "core/instance.hpp"
9 #include "core/inthread.hpp"
10 #include "core/keymapper.hpp"
11 #include "core/messages.hpp"
12 #include "core/settings.hpp"
13 #include "library/filesystem.hpp"
14 #include "library/minmax.hpp"
15 #include "library/ogg.hpp"
16 #include "library/opus.hpp"
17 #include "library/opus-ogg.hpp"
18 #include "library/serialization.hpp"
19 #include "library/string.hpp"
20 #include "library/workthread.hpp"
22 #include <cstdint>
23 #include <cmath>
24 #include <list>
25 #include <iostream>
26 #include <fstream>
27 #include <cstring>
28 #include <unistd.h>
29 #include <sys/time.h>
30 #include <zlib.h>
32 //Farther than this, packets can be fastskipped.
33 #define OPUS_CONVERGE_MAX 5760
34 //Maximum size of PCM output for one packet.
35 #define OPUS_MAX_OUT 5760
36 //Output block size.
37 #define OUTPUT_BLOCK 1440
38 //Main sampling rate.
39 #define OPUS_SAMPLERATE 48000
40 //Opus block size
41 #define OPUS_BLOCK_SIZE 960
42 //Threshold for decoding additional block
43 #define BLOCK_THRESHOLD 1200
44 //Maximum output block size.
45 #define OUTPUT_SIZE (BLOCK_THRESHOLD + OUTPUT_BLOCK)
46 //Amount of microseconds per interation.
47 #define ITERATION_TIME 15000
48 //Opus bitrate to use.
49 #define OPUS_BITRATE 48000
50 //Opus min bitrate to use.
51 #define OPUS_MIN_BITRATE 8000
52 //Opus max bitrate to use.
53 #define OPUS_MAX_BITRATE 255000
54 //Ogg Opus granule rate.
55 #define OGGOPUS_GRANULERATE 48000
56 //Record buffer size threshold divider.
57 #define REC_THRESHOLD_DIV 40
58 //Playback buffer size threshold divider.
59 #define PLAY_THRESHOLD_DIV 30
60 //Special granule position: None.
61 #define GRANULEPOS_NONE 0xFFFFFFFFFFFFFFFFULL
63 namespace
65 class opus_playback_stream;
66 class opus_stream;
67 class stream_collection;
68 class bitrate_tracker;
69 class inthread_th;
71 settingvar::supervariable<settingvar::model_int<OPUS_MIN_BITRATE,OPUS_MAX_BITRATE>> SET_opus_bitrate(
72 lsnes_setgrp, "opus-bitrate", "commentary‣Bitrate", OPUS_BITRATE);
73 settingvar::supervariable<settingvar::model_int<OPUS_MIN_BITRATE,OPUS_MAX_BITRATE>> SET_opus_max_bitrate(
74 lsnes_setgrp, "opus-max-bitrate", "commentary‣Max bitrate", OPUS_MAX_BITRATE);
76 struct voicesub_state
78 voicesub_state(settingvar::group& _settings, emulator_dispatch& _dispatch, audioapi_instance& _audio)
79 : settings(_settings), edispatch(_dispatch), audio(_audio)
81 current_time = 0;
82 time_jump = false;
83 active_flag = false;
84 last_frame_number = 0;
85 last_rate = 0;
86 current_collection = NULL;
87 int_task = NULL;
89 //Recording active flag.
90 volatile bool active_flag;
91 //Last seen frame number.
92 uint64_t last_frame_number;
93 //Last seen rate.
94 double last_rate;
95 //Mutex protecting current_time and time_jump.
96 threads::lock time_mutex;
97 //The current time.
98 uint64_t current_time;
99 //Time jump flag. Set if time jump is detected.
100 //If time jump is detected, all current playing streams are stopped, stream locks are cleared and
101 //apropriate streams are restarted. If time jump is false, all unlocked streams coming into range
102 //are started.
103 bool time_jump;
104 //Lock protecting active_playback_streams.
105 threads::lock active_playback_streams_lock;
106 //List of streams currently playing.
107 std::list<opus_playback_stream*> active_playback_streams;
108 //The collection of streams.
109 stream_collection* current_collection;
110 //Lock protecting current collection.
111 threads::lock current_collection_lock;
112 //The task handling the stuff.
113 inthread_th* int_task;
114 //Functions.
115 void start_management_stream(opus_stream& s);
116 void advance_time(uint64_t newtime);
117 void jump_time(uint64_t newtime);
118 void do_resample(audioapi_instance::resampler& r, float* srcbuf, size_t& srcuse, float* dstbuf,
119 size_t& dstuse, size_t dstmax, double ratio);
120 void drain_input();
121 void read_input(float* buf, size_t& use, size_t maxuse);
122 void compress_opus_block(opus::encoder& e, float* buf, size_t& use,
123 opus_stream& active_stream, bitrate_tracker& brtrack);
124 void update_time();
125 void decompress_active_streams(float* out, size_t& use);
126 void handle_tangent_positive_edge(opus::encoder& e, opus_stream*& active_stream,
127 bitrate_tracker& brtrack);
128 void handle_tangent_negative_edge(opus_stream*& active_stream, bitrate_tracker& brtrack);
129 settingvar::group& settings;
130 emulator_dispatch& edispatch;
131 audioapi_instance& audio;
134 voicesub_state* get_state(void* ptr)
136 auto x = reinterpret_cast<voicesub_state*>(ptr);
137 if(!x)
138 throw std::runtime_error("voice_commentary not initialized");
139 return x;
142 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
143 //Bitrate tracker.
144 struct bitrate_tracker
146 bitrate_tracker() throw();
147 void reset() throw();
148 double get_min() throw();
149 double get_avg() throw();
150 double get_max() throw();
151 double get_length() throw();
152 uint64_t get_bytes() throw();
153 uint64_t get_blocks() throw();
154 void submit(uint32_t bytes, uint32_t samples) throw();
155 private:
156 uint64_t blocks;
157 uint64_t samples;
158 uint64_t bytes;
159 uint32_t minrate;
160 uint32_t maxrate;
163 bitrate_tracker::bitrate_tracker() throw()
165 reset();
168 void bitrate_tracker::reset() throw()
170 blocks = 0;
171 samples = 0;
172 bytes = 0;
173 minrate = std::numeric_limits<uint32_t>::max();
174 maxrate = 0;
177 double bitrate_tracker::get_min() throw()
179 return blocks ? minrate / 1000.0 : 0.0;
182 double bitrate_tracker::get_avg() throw()
184 return samples ? bytes / (125.0 * samples / OPUS_SAMPLERATE) : 0.0;
187 double bitrate_tracker::get_max() throw()
189 return blocks ? maxrate / 1000.0 : 0.0;
192 double bitrate_tracker::get_length() throw()
194 return 1.0 * samples / OPUS_SAMPLERATE;
197 uint64_t bitrate_tracker::get_bytes() throw()
199 return bytes;
202 uint64_t bitrate_tracker::get_blocks() throw()
204 return blocks;
207 void bitrate_tracker::submit(uint32_t _bytes, uint32_t _samples) throw()
209 blocks++;
210 samples += _samples;
211 bytes += _bytes;
212 uint32_t irate = _bytes * 8 * OPUS_SAMPLERATE / OPUS_BLOCK_SIZE;
213 minrate = min(minrate, irate);
214 maxrate = max(maxrate, irate);
217 std::ostream& operator<<(std::ostream& s, bitrate_tracker& t)
219 s << t.get_bytes() << " bytes for " << t.get_length() << "s (" << t.get_blocks() << " blocks)"
220 << std::endl << "Bitrate (kbps): min: " << t.get_min() << " avg: " << t.get_avg() << " max:"
221 << t.get_max() << std::endl;
222 return s;
225 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
226 //Information about individual opus packet in stream.
227 struct opus_packetinfo
229 //Length is in units of 1/400th of a second.
230 opus_packetinfo(uint16_t datasize, uint8_t length, uint64_t offset)
232 descriptor = (offset & 0xFFFFFFFFFFULL) | (static_cast<uint64_t>(length) << 40) |
233 (static_cast<uint64_t>(datasize) << 48);
235 //Get the data size of the packet.
236 uint16_t size() { return descriptor >> 48; }
237 //Calculate the length of packet in samples.
238 uint16_t length() { return 120 * ((descriptor >> 40) & 0xFF); }
239 //Calculate the true offset.
240 uint64_t offset() { return descriptor & 0xFFFFFFFFFFULL; }
241 //Read the packet.
242 //Can throw.
243 std::vector<unsigned char> packet(filesystem::ref from_sys);
244 private:
245 uint64_t descriptor;
248 std::vector<unsigned char> opus_packetinfo::packet(filesystem::ref from_sys)
250 std::vector<unsigned char> ret;
251 uint64_t off = offset();
252 uint32_t sz = size();
253 uint32_t cluster = off / CLUSTER_SIZE;
254 uint32_t coff = off % CLUSTER_SIZE;
255 ret.resize(sz);
256 size_t r = from_sys.read_data(cluster, coff, &ret[0], sz);
257 if(r != sz)
258 throw std::runtime_error("Incomplete read");
259 return ret;
262 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
263 //Information about opus stream.
264 struct opus_stream
266 //Create new empty stream with specified base time.
267 opus_stream(uint64_t base, filesystem::ref filesys);
268 //Read stream with specified base time and specified start clusters.
269 //Can throw.
270 opus_stream(uint64_t base, filesystem::ref filesys, uint32_t ctrl_cluster, uint32_t data_cluster);
271 //Import a stream with specified base time.
272 //Can throw.
273 opus_stream(uint64_t base, filesystem::ref filesys, std::ifstream& data,
274 voice_commentary::external_stream_format extfmt, settingvar::group& settings);
275 //Delete this stream (also puts a ref)
276 void delete_stream() { deleting = true; put_ref(); }
277 //Export a stream.
278 //Can throw.
279 void export_stream(std::ofstream& data, voice_commentary::external_stream_format extfmt);
280 //Get length of specified packet in samples.
281 uint16_t packet_length(uint32_t seqno)
283 return (seqno < packets.size()) ? packets[seqno].length() : 0;
285 //Get data of specified packet.
286 //Can throw.
287 std::vector<unsigned char> packet(uint32_t seqno)
289 return (seqno < packets.size()) ? packets[seqno].packet(fs) : std::vector<unsigned char>();
291 //Get base time in samples for stream.
292 uint64_t timebase() { return s_timebase; }
293 //Set base time in samples for stream.
294 void timebase(uint64_t ts) { s_timebase = ts; }
295 //Get length of stream in samples.
296 uint64_t length()
298 if(pregap_length + postgap_length > total_len)
299 return 0;
300 else
301 return total_len - pregap_length - postgap_length;
303 //Set the pregap length.
304 void set_pregap(uint32_t p) { pregap_length = p; }
305 //Get the pregap length.
306 uint32_t get_pregap() { return pregap_length; }
307 //Set the postgap length.
308 void set_potsgap(uint32_t p) { postgap_length = p; }
309 //Get the postgap length.
310 uint32_t get_postgap() { return postgap_length; }
311 //Set gain.
312 void set_gain(int16_t g) { gain = g; }
313 //Get gain.
314 int16_t get_gain() { return gain; }
315 //Get linear gain.
316 float get_gain_linear() { return pow(10, gain / 20); }
317 //Get number of packets in stream.
318 uint32_t blocks() { return packets.size(); }
319 //Is this stream locked?
320 bool islocked() { return locked; }
321 //Lock a stream.
322 void lock() { locked = true; }
323 //Unlock a stream.
324 void unlock() { locked = false; }
325 //Increment reference count.
326 void get_ref() { threads::alock m(reflock); refcount++; }
327 //Decrement reference count, destroying object if it hits zero.
328 void put_ref() { threads::alock m(reflock); refcount--; if(!refcount) destroy(); }
329 //Add new packet into stream.
330 //Not safe to call simultaneously with packet_length() or packet().
331 //Can throw.
332 void write(uint8_t len, const unsigned char* payload, size_t payload_len);
333 //Write stream trailer.
334 void write_trailier();
335 //Get clusters.
336 std::pair<uint32_t, uint32_t> get_clusters() { return std::make_pair(ctrl_cluster, data_cluster); }
337 private:
338 void export_stream_sox(std::ofstream& data);
339 void export_stream_oggopus(std::ofstream& data);
340 void import_stream_sox(std::ifstream& data, settingvar::group& settings);
341 void import_stream_oggopus(std::ifstream& data);
343 opus_stream(const opus_stream&);
344 opus_stream& operator=(const opus_stream&);
345 void destroy();
346 filesystem::ref fs;
347 std::vector<opus_packetinfo> packets;
348 uint64_t total_len;
349 uint64_t s_timebase;
350 uint32_t next_cluster;
351 uint32_t next_offset;
352 uint32_t next_mcluster;
353 uint32_t next_moffset;
354 uint32_t ctrl_cluster;
355 uint32_t data_cluster;
356 uint32_t pregap_length;
357 uint32_t postgap_length;
358 int16_t gain;
359 bool locked;
360 threads::lock reflock;
361 unsigned refcount;
362 bool deleting;
365 opus_stream::opus_stream(uint64_t base, filesystem::ref filesys)
366 : fs(filesys)
368 refcount = 1;
369 deleting = false;
370 total_len = 0;
371 s_timebase = base;
372 locked = false;
373 next_cluster = 0;
374 next_mcluster = 0;
375 next_offset = 0;
376 next_moffset = 0;
377 ctrl_cluster = 0;
378 data_cluster = 0;
379 pregap_length = 0;
380 postgap_length = 0;
381 gain = 0;
384 opus_stream::opus_stream(uint64_t base, filesystem::ref filesys, uint32_t _ctrl_cluster,
385 uint32_t _data_cluster)
386 : fs(filesys)
388 refcount = 1;
389 deleting = false;
390 total_len = 0;
391 s_timebase = base;
392 locked = false;
393 next_cluster = data_cluster = _data_cluster;
394 next_mcluster = ctrl_cluster = _ctrl_cluster;
395 next_offset = 0;
396 next_moffset = 0;
397 pregap_length = 0;
398 postgap_length = 0;
399 gain = 0;
400 //Read the data buffers.
401 char buf[CLUSTER_SIZE];
402 uint32_t last_cluster_seen = next_mcluster;
403 uint64_t total_size = 0;
404 uint64_t total_frames = 0;
405 bool trailers = false;
406 bool saved_pointer_valid = false;
407 uint32_t saved_next_mcluster = 0;
408 uint32_t saved_next_moffset = 0;
409 while(true) {
410 last_cluster_seen = next_mcluster;
411 size_t r = fs.read_data(next_mcluster, next_moffset, buf, CLUSTER_SIZE);
412 if(!r) {
413 //The stream ends here.
414 break;
416 //Find the first unused entry if any.
417 for(unsigned i = 0; i < CLUSTER_SIZE; i += 4)
418 if(!buf[i + 3] || trailers) {
419 //This entry is unused. If the next entry is also unused, that is the end.
420 //Otherwise, there might be stream trailers.
421 if(trailers && !buf[i + 3]) {
422 goto out_parsing; //Ends for real.
424 if(!trailers) {
425 //Set the trailer flag and continue parsing.
426 //The saved offset must be placed here.
427 saved_next_mcluster = last_cluster_seen;
428 saved_next_moffset = i;
429 saved_pointer_valid = true;
430 trailers = true;
431 continue;
433 //This is a trailer entry.
434 if(buf[i + 3] == 2) {
435 //Pregap.
436 pregap_length = serialization::u32b(buf + i) >> 8;
437 } else if(buf[i + 3] == 3) {
438 //Postgap.
439 postgap_length = serialization::u32b(buf + i) >> 8;
440 } else if(buf[i + 3] == 4) {
441 //Gain.
442 gain = serialization::s16b(buf + i);
444 } else {
445 uint16_t psize = serialization::u16b(buf + i);
446 uint8_t plen = serialization::u8b(buf + i + 2);
447 total_size += psize;
448 total_len += 120 * plen;
449 opus_packetinfo p(psize, plen, 1ULL * next_cluster * CLUSTER_SIZE +
450 next_offset);
451 size_t r2 = fs.skip_data(next_cluster, next_offset, psize);
452 if(r2 < psize)
453 throw std::runtime_error("Incomplete data stream");
454 packets.push_back(p);
455 total_frames++;
458 out_parsing:
459 //If saved pointer is valid, restore to that.
460 if(saved_pointer_valid) {
461 next_mcluster = saved_next_mcluster;
462 next_moffset = saved_next_moffset;
466 opus_stream::opus_stream(uint64_t base, filesystem::ref filesys, std::ifstream& data,
467 voice_commentary::external_stream_format extfmt, settingvar::group& settings)
468 : fs(filesys)
470 refcount = 1;
471 deleting = false;
472 total_len = 0;
473 s_timebase = base;
474 locked = false;
475 next_cluster = 0;
476 next_mcluster = 0;
477 next_offset = 0;
478 next_moffset = 0;
479 ctrl_cluster = 0;
480 data_cluster = 0;
481 pregap_length = 0;
482 postgap_length = 0;
483 gain = 0;
484 if(extfmt == voice_commentary::EXTFMT_OGGOPUS)
485 import_stream_oggopus(data);
486 else if(extfmt == voice_commentary::EXTFMT_SOX)
487 import_stream_sox(data, settings);
490 void opus_stream::import_stream_oggopus(std::ifstream& data)
492 ogg::stream_reader_iostreams reader(data);
493 reader.set_errors_to(messages);
494 struct opus::ogg_header h;
495 struct opus::ogg_tags t;
496 ogg::page page;
497 ogg::demuxer d(messages);
498 int state = 0;
499 postgap_length = 0;
500 uint64_t datalen = 0;
501 uint64_t last_datalen = 0;
502 uint64_t last_granulepos = 0;
503 try {
504 while(true) {
505 ogg::packet p;
506 if(!d.wants_packet_out()) {
507 if(!reader.get_page(page))
508 break;
509 d.page_in(page);
510 continue;
511 } else
512 d.packet_out(p);
513 switch(state) {
514 case 0: //Not locked.
515 h.parse(p);
516 if(h.streams != 1)
517 throw std::runtime_error("Multistream OggOpus streams are not "
518 "supported");
519 state = 1; //Expecting comment.
520 pregap_length = h.preskip;
521 gain = h.gain;
522 break;
523 case 1: //Expecting comment.
524 t.parse(p);
525 state = 2; //Data page.
526 if(page.get_eos())
527 throw std::runtime_error("Empty OggOpus stream");
528 break;
529 case 2: //Data page.
530 case 3: //Data page.
531 const std::vector<uint8_t>& pkt = p.get_vector();
532 uint8_t tcnt = opus::packet_tick_count(&pkt[0], pkt.size());
533 if(tcnt) {
534 write(tcnt, &pkt[0], pkt.size());
535 datalen += tcnt * 120;
537 if(p.get_last_page()) {
538 uint64_t samples = p.get_granulepos() - last_granulepos;
539 if(samples > p.get_granulepos())
540 samples = 0;
541 uint64_t rsamples = datalen - last_datalen;
542 bool eos = p.get_on_eos_page();
543 if((samples > rsamples && (state == 3 || eos)) || (samples <
544 rsamples && !eos))
545 messages << "Warning: Granulepos says there are "
546 << samples << " samples, found " << rsamples
547 << std::endl;
548 last_datalen = datalen;
549 last_granulepos = p.get_granulepos();
550 if(p.get_on_eos_page()) {
551 if(samples < rsamples)
552 postgap_length = rsamples - samples;
553 state = 4;
554 goto out;
557 state = 3;
558 break;
561 out:
562 if(state == 0)
563 throw std::runtime_error("No OggOpus stream found");
564 if(state == 1)
565 throw std::runtime_error("Oggopus stream missing required tags pages");
566 if(state == 2 || state == 3)
567 messages << "Warning: Incomplete Oggopus stream." << std::endl;
568 if(datalen <= pregap_length)
569 throw std::runtime_error("Stream too short (entiere pregap not present)");
570 write_trailier();
571 } catch(...) {
572 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
573 if(data_cluster) fs.free_cluster_chain(data_cluster);
574 throw;
578 void opus_stream::import_stream_sox(std::ifstream& data, settingvar::group& settings)
580 bitrate_tracker brtrack;
581 unsigned char tmpi[65536];
582 float tmp[OPUS_MAX_OUT];
583 char header[260];
584 data.read(header, 32);
585 if(!data)
586 throw std::runtime_error("Can't read .sox header");
587 if(serialization::u32l(header + 0) != 0x586F532EULL)
588 throw std::runtime_error("Bad .sox header magic");
589 if(serialization::u8b(header + 4) > 28)
590 data.read(header + 32, serialization::u8b(header + 4) - 28);
591 if(!data)
592 throw std::runtime_error("Can't read .sox header");
593 if(serialization::u64l(header + 16) != 4676829883349860352ULL)
594 throw std::runtime_error("Bad .sox sampling rate");
595 if(serialization::u32l(header + 24) != 1)
596 throw std::runtime_error("Only mono streams are supported");
597 uint64_t samples = serialization::u64l(header + 8);
598 opus::encoder enc(opus::samplerate::r48k, false, opus::application::voice);
599 enc.ctl(opus::bitrate(SET_opus_bitrate(settings)));
600 int32_t pregap = enc.ctl(opus::lookahead);
601 pregap_length = pregap;
602 for(uint64_t i = 0; i < samples + pregap; i += OPUS_BLOCK_SIZE) {
603 size_t bs = OPUS_BLOCK_SIZE;
604 if(i + bs > samples + pregap)
605 bs = samples + pregap - i;
606 //We have to read zero bytes after the end of stream.
607 size_t readable = bs;
608 if(readable + i > samples)
609 readable = max(samples, i) - i;
610 if(readable > 0)
611 data.read(reinterpret_cast<char*>(tmpi), 4 * readable);
612 if(readable < bs)
613 memset(tmpi + 4 * readable, 0, 4 * (bs - readable));
614 if(!data) {
615 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
616 if(data_cluster) fs.free_cluster_chain(data_cluster);
617 throw std::runtime_error("Can't read .sox data");
619 for(size_t j = 0; j < bs; j++)
620 tmp[j] = static_cast<float>(serialization::s32l(tmpi + 4 * j)) / 268435456;
621 if(bs < OPUS_BLOCK_SIZE)
622 postgap_length = OPUS_BLOCK_SIZE - bs;
623 for(size_t j = bs; j < OPUS_BLOCK_SIZE; j++)
624 tmp[j] = 0;
625 try {
626 const size_t opus_out_max2 = SET_opus_max_bitrate(settings) *
627 OPUS_BLOCK_SIZE / 384000;
628 size_t r = enc.encode(tmp, OPUS_BLOCK_SIZE, tmpi, opus_out_max2);
629 write(OPUS_BLOCK_SIZE / 120, tmpi, r);
630 brtrack.submit(r, bs);
631 } catch(std::exception& e) {
632 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
633 if(data_cluster) fs.free_cluster_chain(data_cluster);
634 (stringfmt() << "Error encoding opus packet: " << e.what()).throwex();
637 messages << "Imported stream: " << brtrack;
638 try {
639 write_trailier();
640 } catch(...) {
641 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
642 if(data_cluster) fs.free_cluster_chain(data_cluster);
643 throw;
647 void opus_stream::destroy()
649 if(deleting) {
650 //We catch the errors and print em, because otherwise put_ref could throw, which would
651 //be too much.
652 try {
653 fs.free_cluster_chain(ctrl_cluster);
654 } catch(std::exception& e) {
655 messages << "Failed to delete stream control file: " << e.what();
657 try {
658 fs.free_cluster_chain(data_cluster);
659 } catch(std::exception& e) {
660 messages << "Failed to delete stream data file: " << e.what();
663 delete this;
666 void opus_stream::export_stream_oggopus(std::ofstream& data)
668 if(!packets.size())
669 throw std::runtime_error("Empty oggopus stream is not valid");
670 opus::ogg_header header;
671 opus::ogg_tags tags;
672 ogg::stream_writer_iostreams writer(data);
673 unsigned stream_id = 1;
674 uint64_t true_granule = 0;
675 uint32_t seq = 2;
676 //Headers / Tags.
677 header.version = 1;
678 header.channels = 1;
679 header.preskip = pregap_length;
680 header.rate = OPUS_SAMPLERATE;
681 header.gain = gain;
682 header.map_family = 0;
683 header.streams = 1;
684 header.coupled = 0;
685 header.chanmap[0] = 0;
686 memset(header.chanmap + 1, 255, 254);
687 tags.vendor = "unknown";
688 tags.comments.push_back((stringfmt() << "ENCODER=lsnes rr" + lsnes_version).str());
689 tags.comments.push_back((stringfmt() << "LSNES_STREAM_TS=" << s_timebase).str());
691 struct ogg::page hpage = header.serialize();
692 hpage.set_stream(stream_id);
693 writer.put_page(hpage);
694 seq = tags.serialize([&writer](const ogg::page& p) { writer.put_page(p); }, stream_id);
696 struct ogg::page ppage;
697 ogg::muxer mux(stream_id, seq);
698 for(size_t i = 0; i < packets.size(); i++) {
699 std::vector<unsigned char> p;
700 try {
701 p = packet(i);
702 } catch(std::exception& e) {
703 (stringfmt() << "Error reading opus packet: " << e.what()).throwex();
705 if(!p.size())
706 (stringfmt() << "Empty Opus packet is not valid").throwex();
707 uint32_t samples = static_cast<uint32_t>(opus::packet_tick_count(&p[0], p.size())) * 120;
708 if(i + 1 < packets.size())
709 true_granule += samples;
710 else
711 true_granule = max(true_granule, true_granule + samples - postgap_length);
712 if(!mux.wants_packet_in() || !mux.packet_fits(p.size()))
713 while(mux.has_page_out()) {
714 mux.page_out(ppage);
715 writer.put_page(ppage);
717 mux.packet_in(p, true_granule);
719 mux.signal_eos();
720 while(mux.has_page_out()) {
721 mux.page_out(ppage);
722 writer.put_page(ppage);
726 void opus_stream::export_stream_sox(std::ofstream& data)
728 opus::decoder dec(opus::samplerate::r48k, false);
729 std::vector<unsigned char> p;
730 float tmp[OPUS_MAX_OUT];
731 char header[32];
732 serialization::u64l(header, 0x1C586F532EULL); //Magic and header size.
733 serialization::u64l(header + 16, 4676829883349860352ULL); //Sampling rate.
734 serialization::u32l(header + 24, 1);
735 uint64_t tlen = 0;
736 uint32_t lookahead_thrown = 0;
737 data.write(header, 32);
738 if(!data)
739 throw std::runtime_error("Error writing PCM data.");
740 float lgain = get_gain_linear();
741 for(size_t i = 0; i < packets.size(); i++) {
742 char blank[4] = {0, 0, 0, 0};
743 try {
744 uint32_t pregap_throw = 0;
745 uint32_t postgap_throw = 0;
746 std::vector<unsigned char> p = packet(i);
747 uint32_t len = packet_length(i);
748 dec.decode(&p[0], p.size(), tmp, OPUS_MAX_OUT);
749 bool is_last = (i == packets.size() - 1);
750 if(lookahead_thrown < pregap_length) {
751 //We haven't yet thrown the full pregap. Throw some.
752 uint32_t maxthrow = pregap_length - lookahead_thrown;
753 pregap_throw = min(len, maxthrow);
754 lookahead_thrown += pregap_length;
756 if(is_last)
757 postgap_throw = min(len - pregap_throw, postgap_length);
758 tlen += (len - pregap_throw - postgap_throw);
759 for(uint32_t j = pregap_throw; j < len - postgap_throw; j++) {
760 int32_t s = (int32_t)(tmp[j] * lgain * 268435456.0);
761 serialization::s32l(blank, s);
762 data.write(blank, 4);
763 if(!data)
764 throw std::runtime_error("Error writing PCM data.");
766 } catch(std::exception& e) {
767 (stringfmt() << "Error decoding opus packet: " << e.what()).throwex();
770 data.seekp(0, std::ios_base::beg);
771 serialization::u64l(header + 8, tlen);
772 data.write(header, 32);
773 if(!data) {
774 throw std::runtime_error("Error writing PCM data.");
778 void opus_stream::export_stream(std::ofstream& data, voice_commentary::external_stream_format extfmt)
780 if(extfmt == voice_commentary::EXTFMT_OGGOPUS)
781 export_stream_oggopus(data);
782 else if(extfmt == voice_commentary::EXTFMT_SOX)
783 export_stream_sox(data);
786 void opus_stream::write(uint8_t len, const unsigned char* payload, size_t payload_len)
788 try {
789 char descriptor[4];
790 uint32_t used_cluster, used_offset;
791 uint32_t used_mcluster, used_moffset;
792 if(!next_cluster)
793 next_cluster = data_cluster = fs.allocate_cluster();
794 if(!next_mcluster)
795 next_mcluster = ctrl_cluster = fs.allocate_cluster();
796 serialization::u16b(descriptor, payload_len);
797 serialization::u8b(descriptor + 2, len);
798 serialization::u8b(descriptor + 3, 1);
799 fs.write_data(next_cluster, next_offset, payload, payload_len, used_cluster, used_offset);
800 fs.write_data(next_mcluster, next_moffset, descriptor, 4, used_mcluster, used_moffset);
801 uint64_t off = static_cast<uint64_t>(used_cluster) * CLUSTER_SIZE + used_offset;
802 opus_packetinfo p(payload_len, len, off);
803 total_len += p.length();
804 packets.push_back(p);
805 } catch(std::exception& e) {
806 (stringfmt() << "Can't write opus packet: " << e.what()).throwex();
810 void opus_stream::write_trailier()
812 try {
813 char descriptor[16];
814 uint32_t used_mcluster, used_moffset;
815 //The allocation must be done for real.
816 if(!next_mcluster)
817 next_mcluster = ctrl_cluster = fs.allocate_cluster();
818 //But the write must not update the pointers..
819 uint32_t tmp_mcluster = next_mcluster;
820 uint32_t tmp_moffset = next_moffset;
821 serialization::u32b(descriptor, 0);
822 serialization::u32b(descriptor + 4, (pregap_length << 8) | 0x02);
823 serialization::u32b(descriptor + 8, (postgap_length << 8) | 0x03);
824 serialization::s16b(descriptor + 12, gain);
825 serialization::u16b(descriptor + 14, 0x0004);
826 fs.write_data(tmp_mcluster, tmp_moffset, descriptor, 16, used_mcluster, used_moffset);
827 } catch(std::exception& e) {
828 (stringfmt() << "Can't write stream trailer: " << e.what()).throwex();
833 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
834 //Playing opus stream.
835 struct opus_playback_stream
837 //Create a new playing stream from given opus stream.
838 opus_playback_stream(opus_stream& data);
839 //Destroy playing opus stream.
840 ~opus_playback_stream();
841 //Read samples from stream.
842 //Can throw.
843 void read(float* data, size_t samples);
844 //Skip samples from stream.
845 //Can throw.
846 void skip(uint64_t samples);
847 //Has the stream already ended?
848 bool eof();
849 private:
850 opus_playback_stream(const opus_playback_stream&);
851 opus_playback_stream& operator=(const opus_playback_stream&);
852 //Can throw.
853 void decode_block();
854 float output[OPUS_MAX_OUT];
855 unsigned output_left;
856 uint32_t pregap_thrown;
857 bool postgap_thrown;
858 opus::decoder* decoder;
859 opus_stream& stream;
860 uint32_t next_block;
861 uint32_t blocks;
864 opus_playback_stream::opus_playback_stream(opus_stream& data)
865 : stream(data)
867 stream.get_ref();
868 stream.lock();
869 next_block = 0;
870 output_left = 0;
871 pregap_thrown = 0;
872 postgap_thrown = false;
873 blocks = stream.blocks();
874 decoder = new opus::decoder(opus::samplerate::r48k, false);
875 if(!decoder)
876 throw std::bad_alloc();
879 opus_playback_stream::~opus_playback_stream()
881 //No, we don't unlock the stream.
882 stream.put_ref();
883 delete decoder;
886 bool opus_playback_stream::eof()
888 return (next_block >= blocks && !output_left);
891 void opus_playback_stream::decode_block()
893 if(next_block >= blocks)
894 return;
895 if(output_left >= OPUS_MAX_OUT)
896 return;
897 unsigned plen = stream.packet_length(next_block);
898 if(plen + output_left > OPUS_MAX_OUT)
899 return;
900 std::vector<unsigned char> pdata = stream.packet(next_block);
901 try {
902 size_t c = decoder->decode(&pdata[0], pdata.size(), output + output_left,
903 OPUS_MAX_OUT - output_left);
904 output_left = min(output_left + c, static_cast<size_t>(OPUS_MAX_OUT));
905 } catch(...) {
906 //Bad packet, insert silence.
907 for(unsigned i = 0; i < plen; i++)
908 output[output_left++] = 0;
910 //Throw the pregap away if needed.
911 if(pregap_thrown < stream.get_pregap()) {
912 uint32_t throw_amt = min(stream.get_pregap() - pregap_thrown, (uint32_t)output_left);
913 if(throw_amt && throw_amt < output_left)
914 memmove(output, output + throw_amt, (output_left - throw_amt) * sizeof(float));
915 output_left -= throw_amt;
916 pregap_thrown += throw_amt;
918 next_block++;
921 void opus_playback_stream::read(float* data, size_t samples)
923 float lgain = stream.get_gain_linear();
924 while(samples > 0) {
925 decode_block();
926 if(next_block >= blocks && !postgap_thrown) {
927 //This is the final packet. Throw away postgap samples at the end.
928 uint32_t thrown = min(stream.get_postgap(), (uint32_t)output_left);
929 output_left -= thrown;
930 postgap_thrown = true;
932 if(next_block >= blocks && !output_left) {
933 //Zerofill remainder.
934 for(size_t i = 0; i < samples; i++)
935 data[i] = 0;
936 return;
938 unsigned maxcopy = min(static_cast<unsigned>(samples), output_left);
939 if(maxcopy) {
940 memcpy(data, output, maxcopy * sizeof(float));
941 for(size_t i = 0; i < maxcopy; i++)
942 data[i] *= lgain;
944 if(maxcopy < output_left && maxcopy)
945 memmove(output, output + maxcopy, (output_left - maxcopy) * sizeof(float));
946 output_left -= maxcopy;
947 samples -= maxcopy;
948 data += maxcopy;
952 void opus_playback_stream::skip(uint64_t samples)
954 //Adjust for preskip and declare all preskip already thrown away.
955 pregap_thrown = stream.get_pregap();
956 samples += pregap_thrown;
957 postgap_thrown = false;
958 //First, skip inside decoded samples.
959 if(samples < output_left) {
960 //Skipping less than amount in output buffer. Just discard from output buffer and try
961 //to decode a new block.
962 memmove(output, output + samples, (output_left - samples) * sizeof(float));
963 output_left -= samples;
964 decode_block();
965 return;
966 } else {
967 //Skipping at least the amount of samples in output buffer. First, blank the output buffer
968 //and count those towards samples discarded.
969 samples -= output_left;
970 output_left = 0;
972 //While number of samples is so great that adequate convergence period can be ensured without
973 //decoding this packet, just skip the samples from the packet.
974 while(samples > OPUS_CONVERGE_MAX) {
975 samples -= stream.packet_length(next_block++);
976 //Did we hit EOF?
977 if(next_block >= blocks)
978 return;
980 //Okay, we are near the point. Start decoding packets.
981 while(samples > 0) {
982 decode_block();
983 //Did we hit EOF?
984 if(next_block >= blocks && !output_left)
985 return;
986 //Skip as many samples as possible.
987 unsigned maxskip = min(static_cast<unsigned>(samples), output_left);
988 if(maxskip < output_left)
989 memmove(output, output + maxskip, (output_left - maxskip) * sizeof(float));
990 output_left -= maxskip;
991 samples -= maxskip;
993 //Just to be nice, decode a extra block.
994 decode_block();
998 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
999 //Collection of streams.
1000 struct stream_collection
1002 public:
1003 //Create a new collection.
1004 //Can throw.
1005 stream_collection(filesystem::ref filesys);
1006 //Destroy a collection. All streams are destroyed but not deleted.
1007 ~stream_collection();
1008 //Get list of streams active at given point.
1009 std::list<uint64_t> streams_at(uint64_t point);
1010 //Add a stream into collection.
1011 //Can throw.
1012 uint64_t add_stream(opus_stream& stream);
1013 //Get the filesystem this collection is for.
1014 filesystem::ref get_filesystem() { return fs; }
1015 //Unlock all streams in collection.
1016 void unlock_all();
1017 //Get stream with given index (NULL if not found).
1018 opus_stream* get_stream(uint64_t index)
1020 threads::alock m(mlock);
1021 if(streams.count(index)) {
1022 streams[index]->get_ref();
1023 return streams[index];
1025 return NULL;
1027 //Delete a stream.
1028 //Can throw.
1029 void delete_stream(uint64_t index);
1030 //Alter stream timebase.
1031 //Can throw.
1032 void alter_stream_timebase(uint64_t index, uint64_t newts);
1033 //Alter stream gain.
1034 void alter_stream_gain(uint64_t index, uint16_t newgain);
1035 //Enumerate all valid stream indices, in time order.
1036 std::list<uint64_t> all_streams();
1037 //Export the entiere superstream.
1038 //Can throw.
1039 void export_superstream(std::ofstream& out);
1040 private:
1041 filesystem::ref fs;
1042 uint64_t next_index;
1043 unsigned next_stream;
1044 threads::lock mlock;
1045 std::set<uint64_t> free_indices;
1046 std::map<uint64_t, uint64_t> entries;
1047 std::multimap<uint64_t, uint64_t> streams_by_time;
1048 //FIXME: Something more efficient.
1049 std::map<uint64_t, opus_stream*> streams;
1052 stream_collection::stream_collection(filesystem::ref filesys)
1053 : fs(filesys)
1055 next_stream = 0;
1056 next_index = 0;
1057 //The stream index table is in cluster 2.
1058 uint32_t next_cluster = 2;
1059 uint32_t next_offset = 0;
1060 uint32_t i = 0;
1061 try {
1062 while(true) {
1063 char buffer[16];
1064 size_t r = fs.read_data(next_cluster, next_offset, buffer, 16);
1065 if(r < 16)
1066 break;
1067 uint64_t timebase = serialization::u64b(buffer);
1068 uint32_t ctrl_cluster = serialization::u32b(buffer + 8);
1069 uint32_t data_cluster = serialization::u32b(buffer + 12);
1070 if(ctrl_cluster) {
1071 opus_stream* x = new opus_stream(timebase, fs, ctrl_cluster, data_cluster);
1072 entries[next_index] = i;
1073 streams_by_time.insert(std::make_pair(timebase, next_index));
1074 streams[next_index++] = x;
1075 } else
1076 free_indices.insert(i);
1077 next_stream = ++i;
1079 } catch(std::exception& e) {
1080 for(auto i : streams)
1081 i.second->put_ref();
1082 (stringfmt() << "Failed to parse LSVS: " << e.what()).throwex();
1086 stream_collection::~stream_collection()
1088 threads::alock m(mlock);
1089 for(auto i : streams)
1090 i.second->put_ref();
1091 streams.clear();
1094 std::list<uint64_t> stream_collection::streams_at(uint64_t point)
1096 threads::alock m(mlock);
1097 std::list<uint64_t> s;
1098 for(auto i : streams) {
1099 uint64_t start = i.second->timebase();
1100 uint64_t end = start + i.second->length();
1101 if(point >= start && point < end) {
1102 i.second->get_ref();
1103 s.push_back(i.first);
1106 return s;
1109 uint64_t stream_collection::add_stream(opus_stream& stream)
1111 uint64_t idx;
1112 try {
1113 threads::alock m(mlock);
1114 //Lock the added stream so it doesn't start playing back immediately.
1115 stream.lock();
1116 idx = next_index++;
1117 streams[idx] = &stream;
1118 char buffer[16];
1119 serialization::u64b(buffer, stream.timebase());
1120 auto r = stream.get_clusters();
1121 serialization::u32b(buffer + 8, r.first);
1122 serialization::u32b(buffer + 12, r.second);
1123 uint64_t entry_number = 0;
1124 if(free_indices.empty())
1125 entry_number = next_stream++;
1126 else {
1127 entry_number = *free_indices.begin();
1128 free_indices.erase(entry_number);
1130 uint32_t write_cluster = 2;
1131 uint32_t write_offset = 0;
1132 uint32_t dummy1, dummy2;
1133 fs.skip_data(write_cluster, write_offset, 16 * entry_number);
1134 fs.write_data(write_cluster, write_offset, buffer, 16, dummy1, dummy2);
1135 streams_by_time.insert(std::make_pair(stream.timebase(), idx));
1136 entries[idx] = entry_number;
1137 return idx;
1138 } catch(std::exception& e) {
1139 (stringfmt() << "Failed to add stream: " << e.what()).throwex();
1141 return idx;
1144 void stream_collection::unlock_all()
1146 threads::alock m(mlock);
1147 for(auto i : streams)
1148 i.second->unlock();
1151 void stream_collection::delete_stream(uint64_t index)
1153 threads::alock m(mlock);
1154 if(!entries.count(index))
1155 return;
1156 uint64_t entry_number = entries[index];
1157 uint32_t write_cluster = 2;
1158 uint32_t write_offset = 0;
1159 uint32_t dummy1, dummy2;
1160 char buffer[16] = {0};
1161 fs.skip_data(write_cluster, write_offset, 16 * entry_number);
1162 fs.write_data(write_cluster, write_offset, buffer, 16, dummy1, dummy2);
1163 auto itr = streams_by_time.lower_bound(streams[index]->timebase());
1164 auto itr2 = streams_by_time.upper_bound(streams[index]->timebase());
1165 for(auto x = itr; x != itr2; x++)
1166 if(x->second == index) {
1167 streams_by_time.erase(x);
1168 break;
1170 streams[index]->delete_stream();
1171 streams.erase(index);
1174 void stream_collection::alter_stream_timebase(uint64_t index, uint64_t newts)
1176 try {
1177 threads::alock m(mlock);
1178 if(!streams.count(index))
1179 return;
1180 if(entries.count(index)) {
1181 char buffer[8];
1182 uint32_t write_cluster = 2;
1183 uint32_t write_offset = 0;
1184 uint32_t dummy1, dummy2;
1185 serialization::u64b(buffer, newts);
1186 fs.skip_data(write_cluster, write_offset, 16 * entries[index]);
1187 fs.write_data(write_cluster, write_offset, buffer, 8, dummy1, dummy2);
1189 auto itr = streams_by_time.lower_bound(streams[index]->timebase());
1190 auto itr2 = streams_by_time.upper_bound(streams[index]->timebase());
1191 for(auto x = itr; x != itr2; x++)
1192 if(x->second == index) {
1193 streams_by_time.erase(x);
1194 break;
1196 streams[index]->timebase(newts);
1197 streams_by_time.insert(std::make_pair(newts, index));
1198 } catch(std::exception& e) {
1199 (stringfmt() << "Failed to alter stream timebase: " << e.what()).throwex();
1203 void stream_collection::alter_stream_gain(uint64_t index, uint16_t newgain)
1205 try {
1206 threads::alock m(mlock);
1207 if(!streams.count(index))
1208 return;
1209 streams[index]->set_gain(newgain);
1210 streams[index]->write_trailier();
1211 } catch(std::exception& e) {
1212 (stringfmt() << "Failed to alter stream gain: " << e.what()).throwex();
1216 std::list<uint64_t> stream_collection::all_streams()
1218 threads::alock m(mlock);
1219 std::list<uint64_t> s;
1220 for(auto i : streams_by_time)
1221 s.push_back(i.second);
1222 return s;
1225 void stream_collection::export_superstream(std::ofstream& out)
1227 std::list<uint64_t> slist = all_streams();
1228 //Find the total length of superstream.
1229 uint64_t len = 0;
1230 for(auto i : slist) {
1231 opus_stream* s = get_stream(i);
1232 if(s) {
1233 len = max(len, s->timebase() + s->length());
1234 s->put_ref();
1237 char header[32];
1238 serialization::u64l(header, 0x1C586F532EULL); //Magic and header size.
1239 serialization::u64l(header + 8, len);
1240 serialization::u64l(header + 16, 4676829883349860352ULL); //Sampling rate.
1241 serialization::u64l(header + 24, 1);
1242 out.write(header, 32);
1243 if(!out)
1244 throw std::runtime_error("Error writing PCM output");
1246 //Find the first valid stream.
1247 auto next_i = slist.begin();
1248 opus_stream* next_stream = NULL;
1249 while(next_i != slist.end()) {
1250 next_stream = get_stream(*next_i);
1251 next_i++;
1252 if(next_stream)
1253 break;
1255 uint64_t next_ts;
1256 next_ts = next_stream ? next_stream->timebase() : len;
1258 std::list<opus_playback_stream*> active;
1259 try {
1260 for(uint64_t s = 0; s < len;) {
1261 if(s == next_ts) {
1262 active.push_back(new opus_playback_stream(*next_stream));
1263 next_stream->put_ref();
1264 next_stream = NULL;
1265 while(next_i != slist.end()) {
1266 next_stream = get_stream(*next_i);
1267 next_i++;
1268 if(!next_stream)
1269 continue;
1270 uint64_t next_ts = next_stream->timebase();
1271 if(next_ts > s)
1272 break;
1273 //Okay, this starts too...
1274 active.push_back(new opus_playback_stream(*next_stream));
1275 next_stream->put_ref();
1276 next_stream = NULL;
1278 next_ts = next_stream ? next_stream->timebase() : len;
1280 uint64_t maxsamples = min(next_ts - s, static_cast<uint64_t>(OUTPUT_BLOCK));
1281 maxsamples = min(maxsamples, len - s);
1282 char outbuf[4 * OUTPUT_BLOCK];
1283 float buf1[OUTPUT_BLOCK];
1284 float buf2[OUTPUT_BLOCK];
1285 for(size_t t = 0; t < maxsamples; t++)
1286 buf1[t] = 0;
1287 for(auto t : active) {
1288 t->read(buf2, maxsamples);
1289 for(size_t u = 0; u < maxsamples; u++)
1290 buf1[u] += buf2[u];
1292 for(auto t = active.begin(); t != active.end();) {
1293 if((*t)->eof()) {
1294 auto todel = t;
1295 t++;
1296 delete *todel;
1297 active.erase(todel);
1298 } else
1299 t++;
1301 for(size_t t = 0; t < maxsamples; t++)
1302 serialization::s32l(outbuf + 4 * t, buf1[t] * 268435456);
1303 out.write(outbuf, 4 * maxsamples);
1304 if(!out)
1305 throw std::runtime_error("Failed to write PCM");
1306 s += maxsamples;
1308 } catch(std::exception& e) {
1309 (stringfmt() << "Failed to export PCM: " << e.what()).throwex();
1311 for(auto t = active.begin(); t != active.end();) {
1312 if((*t)->eof()) {
1313 auto todelete = t;
1314 t++;
1315 delete *todelete;
1316 active.erase(todelete);
1317 } else
1318 t++;
1322 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1323 void voicesub_state::start_management_stream(opus_stream& s)
1325 opus_playback_stream* p = new opus_playback_stream(s);
1326 threads::alock m(active_playback_streams_lock);
1327 active_playback_streams.push_back(p);
1330 void voicesub_state::advance_time(uint64_t newtime)
1332 threads::alock m2(current_collection_lock);
1333 if(!current_collection) {
1334 //Clear all.
1335 threads::alock m(active_playback_streams_lock);
1336 for(auto i : active_playback_streams)
1337 delete i;
1338 active_playback_streams.clear();
1339 return;
1341 std::list<uint64_t> sactive = current_collection->streams_at(newtime);
1342 for(auto j : sactive) {
1343 opus_stream* i = current_collection->get_stream(j);
1344 if(!i)
1345 continue;
1346 //Don't play locked streams in order to avoid double playing.
1347 threads::alock m(active_playback_streams_lock);
1348 try {
1349 if(!i->islocked())
1350 active_playback_streams.push_back(new opus_playback_stream(*i));
1351 } catch(std::exception& e) {
1352 messages << "Can't start stream: " << e.what() << std::endl;
1354 i->put_ref();
1358 void voicesub_state::jump_time(uint64_t newtime)
1360 threads::alock m2(current_collection_lock);
1361 if(!current_collection) {
1362 //Clear all.
1363 threads::alock m(active_playback_streams_lock);
1364 for(auto i : active_playback_streams)
1365 delete i;
1366 active_playback_streams.clear();
1367 return;
1369 //Close all currently playing streams.
1371 threads::alock m(active_playback_streams_lock);
1372 for(auto i : active_playback_streams)
1373 delete i;
1374 active_playback_streams.clear();
1376 //Unlock all streams, so they will play.
1377 current_collection->unlock_all();
1378 //Reopen all streams that should be open (with seeking)
1379 std::list<uint64_t> sactive = current_collection->streams_at(newtime);
1380 for(auto j : sactive) {
1381 opus_stream* i = current_collection->get_stream(j);
1382 if(!i)
1383 continue;
1384 //No need to check for locks, because we just busted all of those.
1385 uint64_t p = newtime - i->timebase();
1386 opus_playback_stream* s;
1387 try {
1388 s = new opus_playback_stream(*i);
1389 } catch(std::exception& e) {
1390 messages << "Can't start stream: " << e.what() << std::endl;
1391 return;
1393 i->put_ref();
1394 if(!s)
1395 continue;
1396 s->skip(p);
1397 threads::alock m(active_playback_streams_lock);
1398 active_playback_streams.push_back(s);
1402 //Resample.
1403 void voicesub_state::do_resample(audioapi_instance::resampler& r, float* srcbuf, size_t& srcuse,
1404 float* dstbuf, size_t& dstuse, size_t dstmax, double ratio)
1406 if(srcuse == 0 || dstuse >= dstmax)
1407 return;
1408 float* in = srcbuf;
1409 size_t in_u = srcuse;
1410 float* out = dstbuf + dstuse;
1411 size_t out_u = dstmax - dstuse;
1412 r.resample(in, in_u, out, out_u, ratio, false);
1413 size_t offset = in - srcbuf;
1414 if(offset < srcuse)
1415 memmove(srcbuf, srcbuf + offset, sizeof(float) * (srcuse - offset));
1416 srcuse -= offset;
1417 dstuse = dstmax - out_u;
1420 //Drain the input buffer.
1421 void voicesub_state::drain_input()
1423 while(audio.voice_r_status() > 0) {
1424 float buf[256];
1425 unsigned size = min(audio.voice_r_status(), 256u);
1426 audio.record_voice(buf, size);
1430 //Read the input buffer.
1431 void voicesub_state::read_input(float* buf, size_t& use, size_t maxuse)
1433 size_t rleft = audio.voice_r_status();
1434 unsigned toread = min(rleft, max(maxuse, use) - use);
1435 if(toread > 0) {
1436 audio.record_voice(buf + use, toread);
1437 use += toread;
1441 //Compress Opus block.
1442 void voicesub_state::compress_opus_block(opus::encoder& e, float* buf, size_t& use,
1443 opus_stream& active_stream, bitrate_tracker& brtrack)
1445 const size_t opus_out_max = 1276;
1446 unsigned char opus_output[opus_out_max];
1447 size_t cblock = 0;
1448 if(use >= 960)
1449 cblock = 960;
1450 else if(use >= 480)
1451 cblock = 480;
1452 else if(use >= 240)
1453 cblock = 240;
1454 else if(use >= 120)
1455 cblock = 120;
1456 else
1457 return; //No valid data to compress.
1458 const size_t opus_out_max2 = SET_opus_max_bitrate(settings) * cblock / 384000;
1459 try {
1460 size_t c = e.encode(buf, cblock, opus_output, opus_out_max2);
1461 //Successfully compressed a block.
1462 size_t opus_output_len = c;
1463 brtrack.submit(c, cblock);
1464 try {
1465 active_stream.write(cblock / 120, opus_output, opus_output_len);
1466 } catch(std::exception& e) {
1467 messages << "Error writing data: " << e.what() << std::endl;
1469 } catch(std::exception& e) {
1470 messages << "Opus encoder error: " << e.what() << std::endl;
1472 use -= cblock;
1475 void voicesub_state::update_time()
1477 uint64_t sampletime = 0;
1478 bool jumping = false;
1480 threads::alock m(time_mutex);
1481 sampletime = current_time;
1482 jumping = time_jump;
1483 time_jump = false;
1485 if(jumping)
1486 jump_time(sampletime);
1487 else
1488 advance_time(sampletime);
1491 void voicesub_state::decompress_active_streams(float* out, size_t& use)
1493 size_t base = use;
1494 use += OUTPUT_BLOCK;
1495 for(unsigned i = 0; i < OUTPUT_BLOCK; i++)
1496 out[i + base] = 0;
1497 //Do it this way to minimize the amount of time playback streams lock
1498 //is held.
1499 std::list<opus_playback_stream*> stmp;
1501 threads::alock m(active_playback_streams_lock);
1502 stmp = active_playback_streams;
1504 std::set<opus_playback_stream*> toerase;
1505 for(auto i : stmp) {
1506 float tmp[OUTPUT_BLOCK];
1507 try {
1508 i->read(tmp, OUTPUT_BLOCK);
1509 } catch(std::exception& e) {
1510 messages << "Failed to decompress: " << e.what() << std::endl;
1511 for(unsigned j = 0; j < OUTPUT_BLOCK; j++)
1512 tmp[j] = 0;
1514 for(unsigned j = 0; j < OUTPUT_BLOCK; j++)
1515 out[j + base] += tmp[j];
1516 if(i->eof())
1517 toerase.insert(i);
1520 threads::alock m(active_playback_streams_lock);
1521 for(auto i = active_playback_streams.begin(); i != active_playback_streams.end();) {
1522 if(toerase.count(*i)) {
1523 auto toerase = i;
1524 i++;
1525 delete *toerase;
1526 active_playback_streams.erase(toerase);
1527 } else
1528 i++;
1533 void voicesub_state::handle_tangent_positive_edge(opus::encoder& e, opus_stream*& active_stream,
1534 bitrate_tracker& brtrack)
1536 threads::alock m2(current_collection_lock);
1537 if(!current_collection)
1538 return;
1539 try {
1540 e.ctl(opus::reset);
1541 e.ctl(opus::bitrate(SET_opus_bitrate(settings)));
1542 brtrack.reset();
1543 uint64_t ctime;
1545 threads::alock m(time_mutex);
1546 ctime = current_time;
1548 active_stream = NULL;
1549 active_stream = new opus_stream(ctime, current_collection->get_filesystem());
1550 int32_t pregap = e.ctl(opus::lookahead);
1551 active_stream->set_pregap(pregap);
1552 } catch(std::exception& e) {
1553 messages << "Can't start stream: " << e.what() << std::endl;
1554 return;
1556 messages << "Tangent enaged." << std::endl;
1559 void voicesub_state::handle_tangent_negative_edge(opus_stream*& active_stream, bitrate_tracker& brtrack)
1561 threads::alock m2(current_collection_lock);
1562 messages << "Tangent disenaged: " << brtrack;
1563 try {
1564 active_stream->write_trailier();
1565 } catch(std::exception& e) {
1566 messages << e.what() << std::endl;
1568 if(current_collection) {
1569 try {
1570 current_collection->add_stream(*active_stream);
1571 } catch(std::exception& e) {
1572 messages << "Can't add stream: " << e.what() << std::endl;
1573 active_stream->put_ref();
1575 edispatch.voice_stream_change();
1576 } else
1577 active_stream->put_ref();
1578 active_stream = NULL;
1581 class inthread_th : public workthread
1583 public:
1584 inthread_th(voicesub_state* _internal, audioapi_instance& _audio)
1585 : internal(*_internal), audio(_audio)
1587 quit = false;
1588 quit_ack = false;
1589 rptr = 0;
1590 fire();
1592 void kill()
1594 quit = true;
1596 threads::alock h(lmut);
1597 lcond.notify_all();
1599 while(!quit_ack)
1600 usleep(100000);
1601 usleep(100000);
1603 protected:
1604 void entry()
1606 try {
1607 entry2();
1608 } catch(std::bad_alloc& e) {
1609 OOM_panic();
1610 } catch(std::exception& e) {
1611 messages << "AIEEE... Fatal exception in voice thread: " << e.what() << std::endl;
1613 quit_ack = true;
1615 void entry2()
1617 //Wait for libopus to load...
1618 size_t cbh = opus::add_callback([this]() {
1619 threads::alock h(this->lmut);
1620 this->lcond.notify_all();
1622 while(true) {
1623 threads::alock h(lmut);
1624 if(opus::libopus_loaded() || quit)
1625 break;
1626 lcond.wait(h);
1628 opus::cancel_callback(cbh);
1629 if(quit)
1630 return;
1632 opus::encoder oenc(opus::samplerate::r48k, false, opus::application::voice);
1633 oenc.ctl(opus::bitrate(SET_opus_bitrate(internal.settings)));
1634 audioapi_instance::resampler rin;
1635 audioapi_instance::resampler rout;
1636 const unsigned buf_max = 6144; //These buffers better be large.
1637 size_t buf_in_use = 0;
1638 size_t buf_inr_use = 0;
1639 size_t buf_outr_use = 0;
1640 size_t buf_out_use = 0;
1641 float buf_in[buf_max];
1642 float buf_inr[OPUS_BLOCK_SIZE];
1643 float buf_outr[OUTPUT_SIZE];
1644 float buf_out[buf_max];
1645 bitrate_tracker brtrack;
1646 opus_stream* active_stream = NULL;
1648 internal.drain_input();
1649 while(1) {
1650 if(clear_workflag(workthread::quit_request) & workthread::quit_request) {
1651 if(!internal.active_flag && active_stream)
1652 internal.handle_tangent_negative_edge(active_stream, brtrack);
1653 break;
1655 uint64_t ticks = framerate_regulator::get_utime();
1656 //Handle tangent edgets.
1657 if(internal.active_flag && !active_stream) {
1658 internal.drain_input();
1659 buf_in_use = 0;
1660 buf_inr_use = 0;
1661 internal.handle_tangent_positive_edge(oenc, active_stream, brtrack);
1663 else if((!internal.active_flag || quit) && active_stream)
1664 internal.handle_tangent_negative_edge(active_stream, brtrack);
1665 if(quit)
1666 break;
1668 //Read input, up to 25ms.
1669 unsigned rate_in = audio.voice_rate().first;
1670 unsigned rate_out = audio.voice_rate().second;
1671 size_t dbuf_max = min(buf_max, rate_in / REC_THRESHOLD_DIV);
1672 internal.read_input(buf_in, buf_in_use, dbuf_max);
1674 //Resample up to full opus block.
1675 internal.do_resample(rin, buf_in, buf_in_use, buf_inr, buf_inr_use, OPUS_BLOCK_SIZE,
1676 1.0 * OPUS_SAMPLERATE / rate_in);
1678 //If we have full opus block and recording is enabled, compress it.
1679 if(buf_inr_use >= OPUS_BLOCK_SIZE && active_stream)
1680 internal.compress_opus_block(oenc, buf_inr, buf_inr_use, *active_stream,
1681 brtrack);
1683 //Update time, starting/ending streams.
1684 internal.update_time();
1686 //Decompress active streams.
1687 if(buf_outr_use < BLOCK_THRESHOLD)
1688 internal.decompress_active_streams(buf_outr, buf_outr_use);
1690 //Resample to output rate.
1691 internal.do_resample(rout, buf_outr, buf_outr_use, buf_out, buf_out_use, buf_max,
1692 1.0 * rate_out / OPUS_SAMPLERATE);
1694 //Output stuff.
1695 if(buf_out_use > 0 && audio.voice_p_status2() < rate_out / PLAY_THRESHOLD_DIV) {
1696 audio.play_voice(buf_out, buf_out_use);
1697 buf_out_use = 0;
1700 //Sleep a bit to save CPU use.
1701 uint64_t ticks_spent = framerate_regulator::get_utime() - ticks;
1702 if(ticks_spent < ITERATION_TIME)
1703 usleep(ITERATION_TIME - ticks_spent);
1705 threads::alock h(internal.current_collection_lock);
1706 delete internal.current_collection;
1707 internal.current_collection = NULL;
1709 private:
1710 size_t rptr;
1711 double position;
1712 volatile bool quit;
1713 volatile bool quit_ack;
1714 threads::lock lmut;
1715 threads::cv lcond;
1716 voicesub_state& internal;
1717 audioapi_instance& audio;
1721 voice_commentary::voice_commentary(settingvar::group& _settings, emulator_dispatch& _dispatch,
1722 audioapi_instance& _audio, command::group& _cmd)
1723 : settings(_settings), edispatch(_dispatch), audio(_audio), cmd(_cmd),
1724 tangentp(cmd, CCOMMENTARY::p, [this]() { this->set_active_flag(true); }),
1725 tangentr(cmd, CCOMMENTARY::r, [this]() { this->set_active_flag(false); })
1727 internal = NULL;
1730 voice_commentary::~voice_commentary()
1732 if(internal)
1733 kill();
1736 //Rate is not sampling rate!
1737 void voice_commentary::frame_number(uint64_t newframe, double rate)
1739 if(!internal)
1740 return;
1741 auto _internal = get_state(internal);
1742 if(rate == _internal->last_rate && _internal->last_frame_number == newframe)
1743 return;
1744 threads::alock m(_internal->time_mutex);
1745 _internal->current_time = newframe / rate * OPUS_SAMPLERATE;
1746 if(fabs(rate - _internal->last_rate) > 1e-6 || _internal->last_frame_number + 1 != newframe)
1747 _internal->time_jump = true;
1748 _internal->last_frame_number = newframe;
1749 _internal->last_rate = rate;
1752 void voice_commentary::init()
1754 internal = new voicesub_state(settings, edispatch, audio);
1755 auto _internal = get_state(internal);
1756 try {
1757 _internal->int_task = new inthread_th(_internal, audio);
1758 } catch(...) {
1759 delete _internal;
1760 throw;
1764 void voice_commentary::kill()
1766 auto _internal = get_state(internal);
1767 _internal->int_task->kill();
1768 delete _internal->int_task;
1769 _internal->int_task = NULL;
1770 delete _internal;
1771 internal = NULL;
1774 uint64_t voice_commentary::parse_timebase(const std::string& n)
1776 std::string x = n;
1777 if(x.length() > 0 && x[x.length() - 1] == 's') {
1778 x = x.substr(0, x.length() - 1);
1779 return 48000 * parse_value<double>(x);
1780 } else
1781 return parse_value<uint64_t>(x);
1784 bool voice_commentary::collection_loaded()
1786 if(!internal) return false;
1787 auto _internal = get_state(internal);
1788 threads::alock m2(_internal->current_collection_lock);
1789 return (_internal->current_collection != NULL);
1792 std::list<voice_commentary::playback_stream_info> voice_commentary::get_stream_info()
1794 std::list<voice_commentary::playback_stream_info> in;
1795 if(!internal)
1796 return in;
1797 auto _internal = get_state(internal);
1798 threads::alock m2(_internal->current_collection_lock);
1799 if(!_internal->current_collection)
1800 return in;
1801 for(auto i : _internal->current_collection->all_streams()) {
1802 opus_stream* s = _internal->current_collection->get_stream(i);
1803 voice_commentary::playback_stream_info pi;
1804 if(!s)
1805 continue;
1806 pi.id = i;
1807 pi.base = s->timebase();
1808 pi.length = s->length();
1809 try {
1810 in.push_back(pi);
1811 } catch(...) {
1813 s->put_ref();
1815 return in;
1818 void voice_commentary::play_stream(uint64_t id)
1820 auto _internal = get_state(internal);
1821 threads::alock m2(_internal->current_collection_lock);
1822 if(!_internal->current_collection)
1823 throw std::runtime_error("No collection loaded");
1824 opus_stream* s = _internal->current_collection->get_stream(id);
1825 if(!s)
1826 return;
1827 try {
1828 _internal->start_management_stream(*s);
1829 } catch(...) {
1830 s->put_ref();
1831 throw;
1833 s->put_ref();
1836 void voice_commentary::export_stream(uint64_t id, const std::string& filename,
1837 voice_commentary::external_stream_format fmt)
1839 auto _internal = get_state(internal);
1840 threads::alock m2(_internal->current_collection_lock);
1841 if(!_internal->current_collection)
1842 throw std::runtime_error("No collection loaded");
1843 opus_stream* st = _internal->current_collection->get_stream(id);
1844 if(!st)
1845 return;
1846 std::ofstream s(filename, std::ios_base::out | std::ios_base::binary);
1847 if(!s) {
1848 st->put_ref();
1849 throw std::runtime_error("Can't open output file");
1851 try {
1852 st->export_stream(s, fmt);
1853 } catch(std::exception& e) {
1854 st->put_ref();
1855 (stringfmt() << "Export failed: " << e.what()).throwex();
1857 st->put_ref();
1860 uint64_t voice_commentary::import_stream(uint64_t ts, const std::string& filename,
1861 voice_commentary::external_stream_format fmt)
1863 auto _internal = get_state(internal);
1864 threads::alock m2(_internal->current_collection_lock);
1865 if(!_internal->current_collection)
1866 throw std::runtime_error("No collection loaded");
1868 std::ifstream s(filename, std::ios_base::in | std::ios_base::binary);
1869 if(!s)
1870 throw std::runtime_error("Can't open input file");
1871 opus_stream* st = new opus_stream(ts, _internal->current_collection->get_filesystem(), s, fmt, settings);
1872 uint64_t id;
1873 try {
1874 id = _internal->current_collection->add_stream(*st);
1875 } catch(...) {
1876 st->delete_stream();
1877 throw;
1879 st->unlock(); //Not locked.
1880 edispatch.voice_stream_change();
1881 return id;
1884 void voice_commentary::delete_stream(uint64_t id)
1886 auto _internal = get_state(internal);
1887 threads::alock m2(_internal->current_collection_lock);
1888 if(!_internal->current_collection)
1889 throw std::runtime_error("No collection loaded");
1890 _internal->current_collection->delete_stream(id);
1891 edispatch.voice_stream_change();
1894 void voice_commentary::export_superstream(const std::string& filename)
1896 auto _internal = get_state(internal);
1897 threads::alock m2(_internal->current_collection_lock);
1898 if(!_internal->current_collection)
1899 throw std::runtime_error("No collection loaded");
1900 std::ofstream s(filename, std::ios_base::out | std::ios_base::binary);
1901 if(!s)
1902 throw std::runtime_error("Can't open output file");
1903 _internal->current_collection->export_superstream(s);
1906 void voice_commentary::load_collection(const std::string& filename)
1908 auto _internal = get_state(internal);
1909 threads::alock m2(_internal->current_collection_lock);
1910 filesystem::ref newfs;
1911 stream_collection* newc;
1912 newfs = filesystem::ref(filename);
1913 newc = new stream_collection(newfs);
1914 if(_internal->current_collection)
1915 delete _internal->current_collection;
1916 _internal->current_collection = newc;
1917 edispatch.voice_stream_change();
1920 void voice_commentary::unload_collection()
1922 if(!internal) return;
1923 auto _internal = get_state(internal);
1924 threads::alock m2(_internal->current_collection_lock);
1925 if(_internal->current_collection)
1926 delete _internal->current_collection;
1927 _internal->current_collection = NULL;
1928 edispatch.voice_stream_change();
1931 void voice_commentary::alter_timebase(uint64_t id, uint64_t ts)
1933 auto _internal = get_state(internal);
1934 threads::alock m2(_internal->current_collection_lock);
1935 if(!_internal->current_collection)
1936 throw std::runtime_error("No collection loaded");
1937 _internal->current_collection->alter_stream_timebase(id, ts);
1938 edispatch.voice_stream_change();
1941 float voice_commentary::get_gain(uint64_t id)
1943 auto _internal = get_state(internal);
1944 threads::alock m2(_internal->current_collection_lock);
1945 if(!_internal->current_collection)
1946 throw std::runtime_error("No collection loaded");
1947 return _internal->current_collection->get_stream(id)->get_gain() / 256.0;
1950 void voice_commentary::set_gain(uint64_t id, float gain)
1952 auto _internal = get_state(internal);
1953 threads::alock m2(_internal->current_collection_lock);
1954 if(!_internal->current_collection)
1955 throw std::runtime_error("No collection loaded");
1956 int64_t _gain = gain * 256;
1957 if(_gain < -32768 || _gain > 32767)
1958 throw std::runtime_error("Gain out of range (+-128dB)");
1959 _internal->current_collection->alter_stream_gain(id, _gain);
1960 edispatch.voice_stream_change();
1963 double voice_commentary::ts_seconds(uint64_t ts)
1965 return ts / 48000.0;
1968 void voice_commentary::set_active_flag(bool flag)
1970 if(!internal) return;
1971 auto _internal = get_state(internal);
1972 _internal->active_flag = flag;