Real-time updates for voice streams in editor
[lsnes.git] / src / core / inthread.cpp
blob4a55b616aeafe9991684905c8ea83be70423c221
1 #include <lsnes.hpp>
2 #include <cstdint>
3 #ifdef WITH_OPUS_CODEC
4 #include "library/filesys.hpp"
5 #include "library/minmax.hpp"
6 #include "library/workthread.hpp"
7 #include "library/serialization.hpp"
8 #include "library/string.hpp"
9 #include "library/ogg.hpp"
10 #include "core/audioapi.hpp"
11 #include "core/command.hpp"
12 #include "core/dispatch.hpp"
13 #include "core/framerate.hpp"
14 #include "core/inthread.hpp"
15 #include "core/keymapper.hpp"
16 #include "core/misc.hpp"
17 #include <cmath>
18 #include <list>
19 #include <iostream>
20 #include <fstream>
21 #include <cstring>
22 #include <unistd.h>
23 #include <sys/time.h>
24 #include <zlib.h>
25 //Fsck it.
26 #define OPUS_BUILD
27 #include "opus/opus.h"
28 #include "opus/opus_defines.h"
30 //Farther than this, packets can be fastskipped.
31 #define OPUS_CONVERGE_MAX 5760
32 //Maximum size of PCM output for one packet.
33 #define OPUS_MAX_OUT 5760
34 //Output block size.
35 #define OUTPUT_BLOCK 1440
36 //Main sampling rate.
37 #define OPUS_SAMPLERATE 48000
38 //Opus block size
39 #define OPUS_BLOCK_SIZE 960
40 //Threshold for decoding additional block
41 #define BLOCK_THRESHOLD 1200
42 //Maximum output block size.
43 #define OUTPUT_SIZE (BLOCK_THRESHOLD + OUTPUT_BLOCK)
44 //Amount of microseconds per interation.
45 #define ITERATION_TIME 15000
46 //Opus bitrate to use.
47 #define OPUS_BITRATE 48000
48 //Ogg Opus granule rate.
49 #define OGGOPUS_GRANULERATE 48000
50 //Record buffer size threshold divider.
51 #define REC_THRESHOLD_DIV 40
52 //Playback buffer size threshold divider.
53 #define PLAY_THRESHOLD_DIV 30
54 //Special granule position: None.
55 #define GRANULEPOS_NONE 0xFFFFFFFFFFFFFFFFULL
57 namespace
59 class opus_playback_stream;
60 class opus_stream;
61 class stream_collection;
63 //Recording active flag.
64 volatile bool active_flag = false;
65 //Last seen frame number.
66 uint64_t last_frame_number = 0;
67 //Last seen rate.
68 double last_rate = 0;
69 //Mutex protecting current_time and time_jump.
70 mutex_class time_mutex;
71 //The current time.
72 uint64_t current_time;
73 //Time jump flag. Set if time jump is detected.
74 //If time jump is detected, all current playing streams are stopped, stream locks are cleared and
75 //apropriate streams are restarted. If time jump is false, all unlocked streams coming into range
76 //are started.
77 bool time_jump;
78 //Lock protecting active_playback_streams.
79 mutex_class active_playback_streams_lock;
80 //List of streams currently playing.
81 std::list<opus_playback_stream*> active_playback_streams;
82 //The collection of streams.
83 stream_collection* current_collection;
84 //Lock protecting current collection.
85 mutex_class current_collection_lock;
87 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
88 //Information about individual opus packet in stream.
89 struct opus_packetinfo
91 //Length is in units of 1/400th of a second.
92 opus_packetinfo(uint16_t datasize, uint8_t length, uint64_t offset)
94 descriptor = (offset & 0xFFFFFFFFFFULL) | (static_cast<uint64_t>(length) << 40) |
95 (static_cast<uint64_t>(datasize) << 48);
97 //Get the data size of the packet.
98 uint16_t size() { return descriptor >> 48; }
99 //Calculate the length of packet in samples.
100 uint16_t length() { return 120 * ((descriptor >> 40) & 0xFF); }
101 //Calculate the true offset.
102 uint64_t offset() { return descriptor & 0xFFFFFFFFFFULL; }
103 //Read the packet.
104 //Can throw.
105 std::vector<unsigned char> packet(filesystem_ref from_sys);
106 private:
107 uint64_t descriptor;
110 std::vector<unsigned char> opus_packetinfo::packet(filesystem_ref from_sys)
112 std::vector<unsigned char> ret;
113 uint64_t off = offset();
114 uint32_t sz = size();
115 uint32_t cluster = off / CLUSTER_SIZE;
116 uint32_t coff = off % CLUSTER_SIZE;
117 ret.resize(sz);
118 size_t r = from_sys.read_data(cluster, coff, &ret[0], sz);
119 if(r != sz)
120 throw std::runtime_error("Incomplete read");
121 return ret;
124 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
125 //Information about opus stream.
126 struct opus_stream
128 //Create new empty stream with specified base time.
129 opus_stream(uint64_t base, filesystem_ref filesys);
130 //Read stream with specified base time and specified start clusters.
131 //Can throw.
132 opus_stream(uint64_t base, filesystem_ref filesys, uint32_t ctrl_cluster, uint32_t data_cluster);
133 //Import a stream with specified base time.
134 //Can throw.
135 opus_stream(uint64_t base, filesystem_ref filesys, std::ifstream& data,
136 external_stream_format extfmt);
137 //Delete this stream (also puts a ref)
138 void delete_stream() { deleting = true; put_ref(); }
139 //Export a stream.
140 //Can throw.
141 void export_stream(std::ofstream& data, external_stream_format extfmt);
142 //Get length of specified packet in samples.
143 uint16_t packet_length(uint32_t seqno)
145 return (seqno < packets.size()) ? packets[seqno].length() : 0;
147 //Get data of specified packet.
148 //Can throw.
149 std::vector<unsigned char> packet(uint32_t seqno)
151 return (seqno < packets.size()) ? packets[seqno].packet(fs) : std::vector<unsigned char>();
153 //Get base time in samples for stream.
154 uint64_t timebase() { return s_timebase; }
155 //Set base time in samples for stream.
156 void timebase(uint64_t ts) { s_timebase = ts; }
157 //Get length of stream in samples.
158 uint64_t length()
160 if(pregap_length + postgap_length > total_len)
161 return 0;
162 else
163 return total_len - pregap_length - postgap_length;
165 //Set the pregap length.
166 void set_pregap(uint32_t p) { pregap_length = p; }
167 //Get the pregap length.
168 uint32_t get_pregap() { return pregap_length; }
169 //Set the postgap length.
170 void set_potsgap(uint32_t p) { postgap_length = p; }
171 //Get the postgap length.
172 uint32_t get_postgap() { return postgap_length; }
173 //Set gain.
174 void set_gain(int16_t g) { gain = g; }
175 //Get gain.
176 int16_t get_gain() { return gain; }
177 //Get linear gain.
178 float get_gain_linear() { return pow(10, gain / 20); }
179 //Get number of packets in stream.
180 uint32_t blocks() { return packets.size(); }
181 //Is this stream locked?
182 bool islocked() { return locked; }
183 //Lock a stream.
184 void lock() { locked = true; }
185 //Unlock a stream.
186 void unlock() { locked = false; }
187 //Increment reference count.
188 void get_ref() { umutex_class m(reflock); refcount++; }
189 //Decrement reference count, destroying object if it hits zero.
190 void put_ref() { umutex_class m(reflock); refcount--; if(!refcount) destroy(); }
191 //Add new packet into stream.
192 //Not safe to call simultaneously with packet_length() or packet().
193 //Can throw.
194 void write(uint8_t len, const unsigned char* payload, size_t payload_len);
195 //Write stream trailer.
196 void write_trailier();
197 //Get clusters.
198 std::pair<uint32_t, uint32_t> get_clusters() { return std::make_pair(ctrl_cluster, data_cluster); }
199 private:
200 void export_stream_opusdemo(std::ofstream& data);
201 void export_stream_sox(std::ofstream& data);
202 void export_stream_oggopus(std::ofstream& data);
203 void import_stream_opusdemo(std::ifstream& data);
204 void import_stream_sox(std::ifstream& data);
205 void import_stream_oggopus(std::ifstream& data);
207 opus_stream(const opus_stream&);
208 opus_stream& operator=(const opus_stream&);
209 void destroy();
210 filesystem_ref fs;
211 std::vector<opus_packetinfo> packets;
212 uint64_t total_len;
213 uint64_t s_timebase;
214 uint32_t next_cluster;
215 uint32_t next_offset;
216 uint32_t next_mcluster;
217 uint32_t next_moffset;
218 uint32_t ctrl_cluster;
219 uint32_t data_cluster;
220 uint32_t pregap_length;
221 uint32_t postgap_length;
222 int16_t gain;
223 bool locked;
224 mutex_class reflock;
225 unsigned refcount;
226 bool deleting;
229 opus_stream::opus_stream(uint64_t base, filesystem_ref filesys)
230 : fs(filesys)
232 refcount = 1;
233 deleting = false;
234 total_len = 0;
235 s_timebase = base;
236 locked = false;
237 next_cluster = 0;
238 next_mcluster = 0;
239 next_offset = 0;
240 next_moffset = 0;
241 ctrl_cluster = 0;
242 data_cluster = 0;
243 pregap_length = 0;
244 postgap_length = 0;
245 gain = 0;
248 opus_stream::opus_stream(uint64_t base, filesystem_ref filesys, uint32_t _ctrl_cluster,
249 uint32_t _data_cluster)
250 : fs(filesys)
252 refcount = 1;
253 deleting = false;
254 total_len = 0;
255 s_timebase = base;
256 locked = false;
257 next_cluster = data_cluster = _data_cluster;
258 next_mcluster = ctrl_cluster = _ctrl_cluster;
259 next_offset = 0;
260 next_moffset = 0;
261 pregap_length = 0;
262 postgap_length = 0;
263 gain = 0;
264 //Read the data buffers.
265 char buf[CLUSTER_SIZE];
266 uint32_t last_cluster_seen = next_mcluster;
267 uint64_t total_size = 0;
268 uint64_t total_frames = 0;
269 bool trailers = false;
270 bool saved_pointer_valid = false;
271 uint32_t saved_next_mcluster = 0;
272 uint32_t saved_next_moffset = 0;
273 while(true) {
274 last_cluster_seen = next_mcluster;
275 size_t r = fs.read_data(next_mcluster, next_moffset, buf, CLUSTER_SIZE);
276 if(!r) {
277 //The stream ends here.
278 break;
280 //Find the first unused entry if any.
281 for(unsigned i = 0; i < CLUSTER_SIZE; i += 4)
282 if(!buf[i + 3] || trailers) {
283 //This entry is unused. If the next entry is also unused, that is the end.
284 //Otherwise, there might be stream trailers.
285 if(trailers && !buf[i + 3]) {
286 goto out_parsing; //Ends for real.
288 if(!trailers) {
289 //Set the trailer flag and continue parsing.
290 //The saved offset must be placed here.
291 saved_next_mcluster = last_cluster_seen;
292 saved_next_moffset = i;
293 saved_pointer_valid = true;
294 trailers = true;
295 continue;
297 //This is a trailer entry.
298 if(buf[i + 3] == 2) {
299 //Pregap.
300 pregap_length = read32ube(buf + i) >> 8;
301 } else if(buf[i + 3] == 3) {
302 //Postgap.
303 postgap_length = read32ube(buf + i) >> 8;
304 } else if(buf[i + 3] == 4) {
305 //Gain.
306 gain = read16sbe(buf + i);
308 } else {
309 uint16_t psize = read16ube(buf + i);
310 uint8_t plen = read8ube(buf + i + 2);
311 total_size += psize;
312 total_len += 120 * plen;
313 opus_packetinfo p(psize, plen, 1ULL * next_cluster * CLUSTER_SIZE +
314 next_offset);
315 size_t r2 = fs.skip_data(next_cluster, next_offset, psize);
316 if(r2 < psize)
317 throw std::runtime_error("Incomplete data stream");
318 packets.push_back(p);
319 total_frames++;
322 out_parsing:
323 //If saved pointer is valid, restore to that.
324 if(saved_pointer_valid) {
325 next_mcluster = saved_next_mcluster;
326 next_moffset = saved_next_moffset;
330 opus_stream::opus_stream(uint64_t base, filesystem_ref filesys, std::ifstream& data,
331 external_stream_format extfmt)
332 : fs(filesys)
334 refcount = 1;
335 deleting = false;
336 total_len = 0;
337 s_timebase = base;
338 locked = false;
339 next_cluster = 0;
340 next_mcluster = 0;
341 next_offset = 0;
342 next_moffset = 0;
343 ctrl_cluster = 0;
344 data_cluster = 0;
345 pregap_length = 0;
346 postgap_length = 0;
347 gain = 0;
348 if(extfmt == EXTFMT_OPUSDEMO)
349 import_stream_opusdemo(data);
350 else if(extfmt == EXTFMT_OGGOPUS)
351 import_stream_oggopus(data);
352 else if(extfmt == EXTFMT_SOX)
353 import_stream_sox(data);
356 void opus_stream::import_stream_opusdemo(std::ifstream& data)
358 int err;
359 unsigned char tmpi[65536];
360 float tmp[OPUS_MAX_OUT];
361 OpusDecoder* dec = opus_decoder_create(48000, 1, &err);
362 while(data) {
363 char head[8];
364 data.read(head, 8);
365 if(!data)
366 continue;
367 uint32_t psize = read32ube(head);
368 uint32_t pstate = read32ube(head + 4);
369 if(psize > sizeof(tmpi)) {
370 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
371 if(data_cluster) fs.free_cluster_chain(data_cluster);
372 opus_decoder_destroy(dec);
373 throw std::runtime_error("Packet too large to decode");
375 data.read(reinterpret_cast<char*>(tmpi), psize);
376 if(!data) {
377 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
378 if(data_cluster) fs.free_cluster_chain(data_cluster);
379 opus_decoder_destroy(dec);
380 throw std::runtime_error("Error reading opus packet");
382 int r = opus_decode_float(dec, tmpi, psize, tmp,
383 OPUS_MAX_OUT, 0);
384 if(r < 0) {
385 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
386 if(data_cluster) fs.free_cluster_chain(data_cluster);
387 opus_decoder_destroy(dec);
388 (stringfmt() << "Error decoding opus packet: " << opus_strerror(r)).throwex();
390 uint32_t cstate;
391 opus_decoder_ctl(dec, OPUS_GET_FINAL_RANGE(&cstate));
392 if(cstate != pstate) {
393 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
394 if(data_cluster) fs.free_cluster_chain(data_cluster);
395 opus_decoder_destroy(dec);
396 throw std::runtime_error("Opus packet checksum mismatch");
398 r = opus_decoder_get_nb_samples(dec, tmpi, psize);
399 if(r < 0 || r % 120) {
400 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
401 if(data_cluster) fs.free_cluster_chain(data_cluster);
402 opus_decoder_destroy(dec);
403 throw std::runtime_error("Error getting length of opus packet");
405 uint8_t plen = r / 120;
406 try {
407 write(plen, tmpi, psize);
408 } catch(...) {
409 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
410 if(data_cluster) fs.free_cluster_chain(data_cluster);
411 opus_decoder_destroy(dec);
412 throw;
415 opus_decoder_destroy(dec);
416 try {
417 write_trailier();
418 } catch(...) {
419 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
420 if(data_cluster) fs.free_cluster_chain(data_cluster);
421 throw;
425 class oggopus_importer
427 public:
428 struct packet
430 const unsigned char* data;
431 size_t size;
432 unsigned units;
434 oggopus_importer();
435 void put_page(const ogg_page& p);
436 bool packet_pending();
437 packet get_packet();
438 size_t get_postgap();
439 private:
440 oggopus_importer(const oggopus_importer&);
441 oggopus_importer& operator=(const oggopus_importer&);
442 std::vector<uint8_t> incomplete;
443 std::vector<uint8_t> pincomplete;
444 const ogg_page* curpage;
445 size_t curpacket;
446 size_t pagepackets;
447 size_t postgap;
448 bool eospage;
449 bool incomplete_f;
450 uint64_t expected_granule;
451 bool granule_correction_done;
454 oggopus_importer::oggopus_importer()
456 curpacket = 0;
457 curpage = 0;
458 pagepackets = 0;
459 postgap = 0;
460 eospage = false;
461 incomplete_f = false;
462 expected_granule = 0;
463 granule_correction_done = false;
466 void oggopus_importer::put_page(const ogg_page& p)
468 //If not continued packet, drop the last packet.
469 if(!p.get_continue() && incomplete_f)
470 messages << "Warning: Incomplete packet not continued by the next page" << std::endl;
471 if(!p.get_continue())
472 incomplete.clear();
473 //If totally empty page...
474 if(p.get_packet_count() == 0) {
475 if(p.get_granulepos() != ogg_page::granulepos_none)
476 (stringfmt() << "Bad granulepos in empty page: Expected -1, got "
477 << p.get_granulepos()).throwex();
478 curpacket = 0;
479 pagepackets = 0;
480 curpage = &p;
481 return;
483 //If continued packet, paste packet 0 to previous.
484 size_t tmppackets = p.get_packet_count();
485 bool tmp_incomplete;
486 if((tmp_incomplete = p.get_last_packet_incomplete()))
487 tmppackets--;
488 if(tmppackets == 0 && p.get_granulepos() != ogg_page::granulepos_none)
489 (stringfmt() << "Bad granulepos in page with no complete packets: Expected -1, got "
490 << p.get_granulepos()).throwex();
491 if(tmppackets > 0 && p.get_granulepos() == ogg_page::granulepos_none)
492 (stringfmt() << "Bad granulepos in page with complete packets: Expected != -1, got "
493 << "-1").throwex();
494 auto p0 = p.get_packet(0);
495 size_t off = incomplete.size();
496 incomplete.resize(off + p0.second);
497 if(p0.second)
498 memcpy(&incomplete[off], p0.first, p0.second);
499 incomplete_f = tmp_incomplete;
500 pagepackets = tmppackets;
501 curpacket = 0;
502 curpage = &p;
503 eospage = p.get_eos();
506 bool oggopus_importer::packet_pending()
508 return (curpacket < pagepackets);
511 oggopus_importer::packet oggopus_importer::get_packet()
513 oggopus_importer::packet p;
514 if(curpacket == 0) {
515 //Packet 0 is special.
516 pincomplete = incomplete;
517 p.data = &pincomplete[0];
518 p.size = pincomplete.size();
519 } else {
520 auto pn = curpage->get_packet(curpacket);
521 p.data = pn.first;
522 p.size = pn.second;
524 curpacket++;
525 if(curpacket == pagepackets && incomplete_f) {
526 //Copy the incomplete page to buffer.
527 auto pn = curpage->get_packet(pagepackets);
528 incomplete.resize(pn.second);
529 if(pn.second)
530 memcpy(&incomplete[0], pn.first, pn.second);
532 if(!p.size) {
533 messages << "Warning, skipping empty Opus packet" << std::endl;
534 p.units = 0;
535 } else {
536 int frames = opus_packet_get_nb_frames(p.data, p.size);
537 int samples_pf = opus_packet_get_samples_per_frame(p.data, OGGOPUS_GRANULERATE);
538 if(frames < 0 || samples_pf < 0)
539 (stringfmt() << "Bad Opus packet").throwex();
540 p.units = frames * samples_pf / 120;
542 expected_granule += p.units * 120;
543 if(curpacket == pagepackets && curpage->get_eos()) {
544 if(curpage->get_eos()) {
545 //The postgap is expected_granule - granulepos.
546 if(curpage->get_granulepos() > expected_granule)
547 (stringfmt() << "Page granule too large, expected maximum of " <<
548 expected_granule << ", got " << curpage->get_granulepos()).throwex();
549 postgap = expected_granule - curpage->get_granulepos();
550 if(postgap > p.units * 120) {
551 messages << "Warning, postgap too large, clipped to last packet" << std::endl;
552 postgap = p.units * 120;
554 } else if(!granule_correction_done)
555 expected_granule = curpage->get_granulepos();
556 else
557 (stringfmt() << "Page granule invalid, expected " <<
558 expected_granule << ", got " << curpage->get_granulepos()).throwex();
559 granule_correction_done = true;
561 return p;
564 size_t oggopus_importer::get_postgap()
566 return postgap;
569 void opus_stream::import_stream_oggopus(std::ifstream& data)
571 ogg_stream_reader_iostreams reader(data);
572 ogg_page page;
573 oggopus_importer importer;
574 uint32_t stream_seq = 0; //The imprinting duckling model.
575 int state = 0; //Not locked.
576 size_t advance;
577 uint32_t last_page_seen = 0xFFFFFFFFUL;
578 bool seen_data = false;
579 reader.set_errors_to(messages);
580 try {
581 while(reader.get_page(page)) {
582 if(state && page.get_stream() != stream_seq)
583 continue; //Wrong stream.
584 if(state && page.get_sequence() != last_page_seen + 1)
585 messages << "Warning: Packet(s) missing in OggOpus stream" << std::endl;
586 last_page_seen = page.get_sequence();
587 struct oggopus_header h;
588 struct oggopus_tags t;
589 size_t packets = page.get_packet_count();
590 switch(state) {
591 case 0: //Not locked.
592 try {
593 h = parse_oggopus_header(page);
594 } catch(...) {
595 continue;
597 if(h.streams != 1)
598 throw std::runtime_error("Multistream OggOpus streams are not "
599 "supported");
600 state = 1; //Expecting comment.
601 pregap_length = h.preskip;
602 gain = h.gain;
603 stream_seq = page.get_stream();
604 last_page_seen = page.get_sequence();
605 break;
606 case 1: //Expecting comment.
607 t = parse_oggopus_tags(page);
608 state = 2; //Data page.
609 if(page.get_eos())
610 throw std::runtime_error("Empty OggOpus stream");
611 //We don't do anything with this.
612 break;
613 case 2: //Data page.
614 importer.put_page(page);
615 while(importer.packet_pending()) {
616 auto p = importer.get_packet();
617 if(p.units)
618 write(p.units, p.data, p.size);
620 if(page.get_eos()) {
621 state = 3; //End of stream.
622 goto out;
626 out:
627 if(state == 0)
628 throw std::runtime_error("No OggOpus stream found");
629 if(state == 1)
630 throw std::runtime_error("Oggopus stream missing required tags page");
631 if(state == 2)
632 messages << "Warning: Incomplete Oggopus stream." << std::endl;
633 postgap_length = importer.get_postgap();
634 write_trailier();
635 } catch(...) {
636 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
637 if(data_cluster) fs.free_cluster_chain(data_cluster);
638 throw;
642 void opus_stream::import_stream_sox(std::ifstream& data)
644 int err;
645 unsigned char tmpi[65536];
646 float tmp[OPUS_MAX_OUT];
647 char header[260];
648 data.read(header, 32);
649 if(!data)
650 throw std::runtime_error("Can't read .sox header");
651 if(read32ule(header + 0) != 0x586F532EULL)
652 throw std::runtime_error("Bad .sox header magic");
653 if(read8ube(header + 4) > 28)
654 data.read(header + 32, read8ube(header + 4) - 28);
655 if(!data)
656 throw std::runtime_error("Can't read .sox header");
657 if(read64ule(header + 16) != 4676829883349860352ULL)
658 throw std::runtime_error("Bad .sox sampling rate");
659 if(read32ule(header + 24) != 1)
660 throw std::runtime_error("Only mono streams are supported");
661 uint64_t samples = read64ule(header + 8);
662 OpusEncoder* enc = opus_encoder_create(48000, 1, OPUS_APPLICATION_VOIP, &err);
663 opus_encoder_ctl(enc, OPUS_SET_BITRATE(OPUS_BITRATE));
664 int pregap;
665 opus_encoder_ctl(enc, OPUS_GET_LOOKAHEAD(&pregap));
666 pregap_length = pregap;
667 for(uint64_t i = 0; i < samples + pregap; i += OPUS_BLOCK_SIZE) {
668 size_t bs = OPUS_BLOCK_SIZE;
669 if(i + bs > samples + pregap)
670 bs = samples + pregap - i;
671 //We have to read zero bytes after the end of stream.
672 size_t readable = bs;
673 if(readable + i > samples)
674 readable = max(samples, i) - i;
675 if(readable > 0)
676 data.read(reinterpret_cast<char*>(tmpi), 4 * readable);
677 if(readable < bs)
678 memset(tmpi + 4 * readable, 0, 4 * (bs - readable));
679 if(!data) {
680 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
681 if(data_cluster) fs.free_cluster_chain(data_cluster);
682 opus_encoder_destroy(enc);
683 throw std::runtime_error("Can't read .sox data");
685 for(size_t j = 0; j < bs; j++)
686 tmp[j] = static_cast<float>(read32sle(tmpi + 4 * j)) / 268435456;
687 if(bs < OPUS_BLOCK_SIZE)
688 postgap_length = OPUS_BLOCK_SIZE - bs;
689 for(size_t j = bs; j < OPUS_BLOCK_SIZE; j++)
690 tmp[j] = 0;
691 int r = opus_encode_float(enc, tmp, OPUS_BLOCK_SIZE, tmpi, sizeof(tmpi));
692 if(r < 0) {
693 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
694 if(data_cluster) fs.free_cluster_chain(data_cluster);
695 opus_encoder_destroy(enc);
696 (stringfmt() << "Error encoding opus packet: " << opus_strerror(r)).throwex();
698 try {
699 write(OPUS_BLOCK_SIZE / 120, tmpi, r);
700 } catch(...) {
701 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
702 if(data_cluster) fs.free_cluster_chain(data_cluster);
703 opus_encoder_destroy(enc);
704 throw;
707 opus_encoder_destroy(enc);
708 try {
709 write_trailier();
710 } catch(...) {
711 if(ctrl_cluster) fs.free_cluster_chain(ctrl_cluster);
712 if(data_cluster) fs.free_cluster_chain(data_cluster);
713 throw;
717 void opus_stream::destroy()
719 if(deleting) {
720 //We catch the errors and print em, because otherwise put_ref could throw, which would
721 //be too much.
722 try {
723 fs.free_cluster_chain(ctrl_cluster);
724 } catch(std::exception& e) {
725 messages << "Failed to delete stream control file: " << e.what();
727 try {
728 fs.free_cluster_chain(data_cluster);
729 } catch(std::exception& e) {
730 messages << "Failed to delete stream data file: " << e.what();
733 delete this;
736 void opus_stream::export_stream_opusdemo(std::ofstream& data)
738 int err;
739 OpusDecoder* dec = opus_decoder_create(48000, 1, &err);
740 std::vector<unsigned char> p;
741 float tmp[OPUS_MAX_OUT];
742 for(size_t i = 0; i < packets.size(); i++) {
743 char head[8];
744 unsigned state;
745 try {
746 p = packet(i);
747 } catch(std::exception& e) {
748 opus_decoder_destroy(dec);
749 (stringfmt() << "Error reading opus packet: " << e.what()).throwex();
751 int r = opus_decode_float(dec, &p[0], p.size(), tmp, OPUS_MAX_OUT, 0);
752 if(r < 0) {
753 opus_decoder_destroy(dec);
754 (stringfmt() << "Error decoding opus packet: " << opus_strerror(r)).throwex();
756 opus_decoder_ctl(dec, OPUS_GET_FINAL_RANGE(&state));
757 write32ube(head + 0, p.size());
758 write32ube(head + 4, state);
759 data.write(head, 8);
760 data.write(reinterpret_cast<char*>(&p[0]), p.size());
761 if(!data) {
762 opus_decoder_destroy(dec);
763 throw std::runtime_error("Error writing opus packet");
766 opus_decoder_destroy(dec);
769 void opus_stream::export_stream_oggopus(std::ofstream& data)
771 oggopus_header header;
772 oggopus_tags tags;
773 ogg_stream_writer_iostreams writer(data);
774 //Headers / Tags.
775 header.version = 1;
776 header.channels = 1;
777 header.preskip = pregap_length;
778 header.rate = OPUS_SAMPLERATE;
779 header.gain = 0;
780 header.map_family = 0;
781 header.streams = 1;
782 header.coupled = 0;
783 header.chanmap[0] = 0;
784 memset(header.chanmap + 1, 255, 254);
785 tags.vendor = "lsnes rr" + lsnes_version;
786 tags.comments.push_back((stringfmt() << "LSNES_STREAM_TS=" << s_timebase).str());
787 struct ogg_page hpage = serialize_oggopus_header(header);
788 struct ogg_page tpage = serialize_oggopus_tags(tags);
789 struct ogg_page ppage;
790 uint64_t true_granule = 0;
791 uint32_t seq = 2;
792 //Empty stream?
793 if(!packets.size())
794 tpage.set_eos(true);
795 writer.put_page(hpage);
796 writer.put_page(tpage);
797 for(size_t i = 0; i < packets.size(); i++) {
798 std::vector<unsigned char> p;
799 try {
800 p = packet(i);
801 } catch(std::exception& e) {
802 (stringfmt() << "Error reading opus packet: " << e.what()).throwex();
804 if(!p.size())
805 (stringfmt() << "Empty Opus packet is not valid").throwex();
806 int frames = opus_packet_get_nb_frames(&p[0], p.size());
807 int samples_pf = opus_packet_get_samples_per_frame(&p[0], OGGOPUS_GRANULERATE);
808 if(frames < 0 || samples_pf < 0)
809 (stringfmt() << "Bad Opus packet").throwex();
810 if(!ppage.append_packet(&p[0], p.size())) {
811 //Won't fit.
812 ppage.set_granulepos(true_granule);
813 ppage.set_sequence(seq++);
814 writer.put_page(ppage);
815 ppage = ogg_page(); //Reset.
816 if(!ppage.append_packet(&p[0], p.size()))
817 throw std::runtime_error("Internal error: Opus packet larger than page");
819 true_granule += frames * samples_pf;
821 ppage.set_eos(true);
822 ppage.set_sequence(seq++);
823 ppage.set_granulepos(true_granule - postgap_length);
824 writer.put_page(ppage);
827 void opus_stream::export_stream_sox(std::ofstream& data)
829 int err;
830 OpusDecoder* dec = opus_decoder_create(48000, 1, &err);
831 std::vector<unsigned char> p;
832 float tmp[OPUS_MAX_OUT];
833 char header[32];
834 write64ule(header, 0x1C586F532EULL); //Magic and header size.
835 write64ule(header + 16, 4676829883349860352ULL); //Sampling rate.
836 write32ule(header + 24, 1);
837 uint64_t tlen = 0;
838 uint32_t lookahead_thrown = 0;
839 data.write(header, 32);
840 if(!data) {
841 opus_decoder_destroy(dec);
842 throw std::runtime_error("Error writing PCM data.");
844 float lgain = get_gain_linear();
845 for(size_t i = 0; i < packets.size(); i++) {
846 char blank[4] = {0, 0, 0, 0};
847 std::vector<unsigned char> p;
848 try {
849 p = packet(i);
850 } catch(std::exception& e) {
851 opus_decoder_destroy(dec);
852 (stringfmt() << "Error reading opus packet: " << e.what()).throwex();
854 uint32_t len = packet_length(i);
855 int r = opus_decode_float(dec, &p[0], p.size(), tmp, OPUS_MAX_OUT, 0);
856 bool is_last = (i == packets.size() - 1);
857 uint32_t pregap_throw = 0;
858 uint32_t postgap_throw = 0;
859 if(r < 0) {
860 opus_decoder_destroy(dec);
861 (stringfmt() << "Error decoding opus packet: " << opus_strerror(r)).throwex();
863 if(lookahead_thrown < pregap_length) {
864 //We haven't yet thrown the full pregap. Throw some.
865 uint32_t maxthrow = pregap_length - lookahead_thrown;
866 pregap_throw = min(len, maxthrow);
867 lookahead_thrown += pregap_length;
869 if(is_last)
870 postgap_throw = min(len - pregap_throw, postgap_length);
871 tlen += (len - pregap_throw - postgap_throw);
872 for(uint32_t j = pregap_throw; j < len - postgap_throw; j++) {
873 int32_t s = (int32_t)(tmp[j] * lgain * 268435456.0);
874 write32sle(blank, s);
875 data.write(blank, 4);
876 if(!data)
877 throw std::runtime_error("Error writing PCM data.");
880 data.seekp(0, std::ios_base::beg);
881 write64ule(header + 8, tlen);
882 data.write(header, 32);
883 if(!data) {
884 opus_decoder_destroy(dec);
885 throw std::runtime_error("Error writing PCM data.");
887 opus_decoder_destroy(dec);
890 void opus_stream::export_stream(std::ofstream& data, external_stream_format extfmt)
892 if(extfmt == EXTFMT_OPUSDEMO)
893 export_stream_opusdemo(data);
894 else if(extfmt == EXTFMT_OGGOPUS)
895 export_stream_oggopus(data);
896 else if(extfmt == EXTFMT_SOX)
897 export_stream_sox(data);
900 void opus_stream::write(uint8_t len, const unsigned char* payload, size_t payload_len)
902 try {
903 char descriptor[4];
904 uint32_t used_cluster, used_offset;
905 uint32_t used_mcluster, used_moffset;
906 if(!next_cluster)
907 next_cluster = data_cluster = fs.allocate_cluster();
908 if(!next_mcluster)
909 next_mcluster = ctrl_cluster = fs.allocate_cluster();
910 write16ube(descriptor, payload_len);
911 write8ube(descriptor + 2, len);
912 write8ube(descriptor + 3, 1);
913 fs.write_data(next_cluster, next_offset, payload, payload_len, used_cluster, used_offset);
914 fs.write_data(next_mcluster, next_moffset, descriptor, 4, used_mcluster, used_moffset);
915 uint64_t off = static_cast<uint64_t>(used_cluster) * CLUSTER_SIZE + used_offset;
916 opus_packetinfo p(payload_len, len, off);
917 total_len += p.length();
918 packets.push_back(p);
919 } catch(std::exception& e) {
920 (stringfmt() << "Can't write opus packet: " << e.what()).throwex();
924 void opus_stream::write_trailier()
926 try {
927 char descriptor[16];
928 uint32_t used_mcluster, used_moffset;
929 //The allocation must be done for real.
930 if(!next_mcluster)
931 next_mcluster = ctrl_cluster = fs.allocate_cluster();
932 //But the write must not update the pointers..
933 uint32_t tmp_mcluster = next_mcluster;
934 uint32_t tmp_moffset = next_moffset;
935 write32ube(descriptor, 0);
936 write32ube(descriptor + 4, (pregap_length << 8) | 0x02);
937 write32ube(descriptor + 8, (postgap_length << 8) | 0x03);
938 write16sbe(descriptor + 12, gain);
939 write16ube(descriptor + 14, 0x0004);
940 fs.write_data(tmp_mcluster, tmp_moffset, descriptor, 16, used_mcluster, used_moffset);
941 } catch(std::exception& e) {
942 (stringfmt() << "Can't write stream trailer: " << e.what()).throwex();
947 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
948 //Playing opus stream.
949 struct opus_playback_stream
951 //Create a new playing stream from given opus stream.
952 opus_playback_stream(opus_stream& data);
953 //Destroy playing opus stream.
954 ~opus_playback_stream();
955 //Read samples from stream.
956 //Can throw.
957 void read(float* data, size_t samples);
958 //Skip samples from stream.
959 //Can throw.
960 void skip(uint64_t samples);
961 //Has the stream already ended?
962 bool eof();
963 private:
964 opus_playback_stream(const opus_playback_stream&);
965 opus_playback_stream& operator=(const opus_playback_stream&);
966 //Can throw.
967 void decode_block();
968 float output[OPUS_MAX_OUT];
969 unsigned output_left;
970 uint32_t pregap_thrown;
971 bool postgap_thrown;
972 OpusDecoder* decoder;
973 opus_stream& stream;
974 uint32_t next_block;
975 uint32_t blocks;
978 opus_playback_stream::opus_playback_stream(opus_stream& data)
979 : stream(data)
981 int err;
982 stream.get_ref();
983 stream.lock();
984 next_block = 0;
985 output_left = 0;
986 pregap_thrown = 0;
987 postgap_thrown = false;
988 blocks = stream.blocks();
989 decoder = opus_decoder_create(OPUS_SAMPLERATE, 1, &err);
990 if(!decoder)
991 throw std::bad_alloc();
994 opus_playback_stream::~opus_playback_stream()
996 //No, we don't unlock the stream.
997 stream.put_ref();
998 opus_decoder_destroy(decoder);
1001 bool opus_playback_stream::eof()
1003 return (next_block >= blocks && !output_left);
1006 void opus_playback_stream::decode_block()
1008 if(next_block >= blocks)
1009 return;
1010 if(output_left >= OPUS_MAX_OUT)
1011 return;
1012 unsigned plen = stream.packet_length(next_block);
1013 if(plen + output_left > OPUS_MAX_OUT)
1014 return;
1015 std::vector<unsigned char> pdata = stream.packet(next_block);
1016 int c = opus_decode_float(decoder, &pdata[0], pdata.size(), output + output_left,
1017 OPUS_MAX_OUT - output_left, 0);
1018 if(c > 0)
1019 output_left = min(output_left + c, static_cast<unsigned>(OPUS_MAX_OUT));
1020 else {
1021 //Bad packet, insert silence.
1022 for(unsigned i = 0; i < plen; i++)
1023 output[output_left++] = 0;
1025 //Throw the pregap away if needed.
1026 if(pregap_thrown < stream.get_pregap()) {
1027 uint32_t throw_amt = min(stream.get_pregap() - pregap_thrown, (uint32_t)output_left);
1028 if(throw_amt && throw_amt < output_left)
1029 memmove(output, output + throw_amt, (output_left - throw_amt) * sizeof(float));
1030 output_left -= throw_amt;
1031 pregap_thrown += throw_amt;
1033 next_block++;
1036 void opus_playback_stream::read(float* data, size_t samples)
1038 float lgain = stream.get_gain_linear();
1039 while(samples > 0) {
1040 decode_block();
1041 if(next_block >= blocks && !postgap_thrown) {
1042 //This is the final packet. Throw away postgap samples at the end.
1043 uint32_t thrown = min(stream.get_postgap(), (uint32_t)output_left);
1044 output_left -= thrown;
1045 postgap_thrown = true;
1047 if(next_block >= blocks && !output_left) {
1048 //Zerofill remainder.
1049 for(size_t i = 0; i < samples; i++)
1050 data[i] = 0;
1051 return;
1053 unsigned maxcopy = min(static_cast<unsigned>(samples), output_left);
1054 if(maxcopy) {
1055 memcpy(data, output, maxcopy * sizeof(float));
1056 for(size_t i = 0; i < maxcopy; i++)
1057 data[i] *= lgain;
1059 if(maxcopy < output_left && maxcopy)
1060 memmove(output, output + maxcopy, (output_left - maxcopy) * sizeof(float));
1061 output_left -= maxcopy;
1062 samples -= maxcopy;
1063 data += maxcopy;
1067 void opus_playback_stream::skip(uint64_t samples)
1069 //Adjust for preskip and declare all preskip already thrown away.
1070 pregap_thrown = stream.get_pregap();
1071 samples += pregap_thrown;
1072 postgap_thrown = false;
1073 //First, skip inside decoded samples.
1074 if(samples < output_left) {
1075 //Skipping less than amount in output buffer. Just discard from output buffer and try
1076 //to decode a new block.
1077 memmove(output, output + samples, (output_left - samples) * sizeof(float));
1078 output_left -= samples;
1079 decode_block();
1080 return;
1081 } else {
1082 //Skipping at least the amount of samples in output buffer. First, blank the output buffer
1083 //and count those towards samples discarded.
1084 samples -= output_left;
1085 output_left = 0;
1087 //While number of samples is so great that adequate convergence period can be ensured without
1088 //decoding this packet, just skip the samples from the packet.
1089 while(samples > OPUS_CONVERGE_MAX) {
1090 samples -= stream.packet_length(next_block++);
1091 //Did we hit EOF?
1092 if(next_block >= blocks)
1093 return;
1095 //Okay, we are near the point. Start decoding packets.
1096 while(samples > 0) {
1097 decode_block();
1098 //Did we hit EOF?
1099 if(next_block >= blocks && !output_left)
1100 return;
1101 //Skip as many samples as possible.
1102 unsigned maxskip = min(static_cast<unsigned>(samples), output_left);
1103 if(maxskip < output_left)
1104 memmove(output, output + maxskip, (output_left - maxskip) * sizeof(float));
1105 output_left -= maxskip;
1106 samples -= maxskip;
1108 //Just to be nice, decode a extra block.
1109 decode_block();
1113 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1114 //Collection of streams.
1115 struct stream_collection
1117 public:
1118 //Create a new collection.
1119 //Can throw.
1120 stream_collection(filesystem_ref filesys);
1121 //Destroy a collection. All streams are destroyed but not deleted.
1122 ~stream_collection();
1123 //Get list of streams active at given point.
1124 std::list<uint64_t> streams_at(uint64_t point);
1125 //Add a stream into collection.
1126 //Can throw.
1127 uint64_t add_stream(opus_stream& stream);
1128 //Get the filesystem this collection is for.
1129 filesystem_ref get_filesystem() { return fs; }
1130 //Unlock all streams in collection.
1131 void unlock_all();
1132 //Get stream with given index (NULL if not found).
1133 opus_stream* get_stream(uint64_t index)
1135 umutex_class m(mutex);
1136 if(streams.count(index)) {
1137 streams[index]->get_ref();
1138 return streams[index];
1140 return NULL;
1142 //Delete a stream.
1143 //Can throw.
1144 void delete_stream(uint64_t index);
1145 //Alter stream timebase.
1146 //Can throw.
1147 void alter_stream_timebase(uint64_t index, uint64_t newts);
1148 //Enumerate all valid stream indices, in time order.
1149 std::list<uint64_t> all_streams();
1150 //Export the entiere superstream.
1151 //Can throw.
1152 void export_superstream(std::ofstream& out);
1153 private:
1154 filesystem_ref fs;
1155 uint64_t next_index;
1156 unsigned next_stream;
1157 mutex_class mutex;
1158 std::set<uint64_t> free_indices;
1159 std::map<uint64_t, uint64_t> entries;
1160 std::multimap<uint64_t, uint64_t> streams_by_time;
1161 //FIXME: Something more efficient.
1162 std::map<uint64_t, opus_stream*> streams;
1165 stream_collection::stream_collection(filesystem_ref filesys)
1166 : fs(filesys)
1168 next_stream = 0;
1169 next_index = 0;
1170 //The stream index table is in cluster 2.
1171 uint32_t next_cluster = 2;
1172 uint32_t next_offset = 0;
1173 uint32_t i = 0;
1174 try {
1175 while(true) {
1176 char buffer[16];
1177 size_t r = fs.read_data(next_cluster, next_offset, buffer, 16);
1178 if(r < 16)
1179 break;
1180 uint64_t timebase = read64ube(buffer);
1181 uint32_t ctrl_cluster = read32ube(buffer + 8);
1182 uint32_t data_cluster = read32ube(buffer + 12);
1183 if(ctrl_cluster) {
1184 opus_stream* x = new opus_stream(timebase, fs, ctrl_cluster, data_cluster);
1185 entries[next_index] = i;
1186 streams_by_time.insert(std::make_pair(timebase, next_index));
1187 streams[next_index++] = x;
1188 } else
1189 free_indices.insert(i);
1190 next_stream = ++i;
1192 } catch(std::exception& e) {
1193 for(auto i : streams)
1194 i.second->put_ref();
1195 (stringfmt() << "Failed to parse LSVS: " << e.what()).throwex();
1199 stream_collection::~stream_collection()
1201 umutex_class m(mutex);
1202 for(auto i : streams)
1203 i.second->put_ref();
1204 streams.clear();
1207 std::list<uint64_t> stream_collection::streams_at(uint64_t point)
1209 umutex_class m(mutex);
1210 std::list<uint64_t> s;
1211 for(auto i : streams) {
1212 uint64_t start = i.second->timebase();
1213 uint64_t end = start + i.second->length();
1214 if(point >= start && point < end) {
1215 i.second->get_ref();
1216 s.push_back(i.first);
1219 return s;
1222 uint64_t stream_collection::add_stream(opus_stream& stream)
1224 try {
1225 umutex_class m(mutex);
1226 //Lock the added stream so it doesn't start playing back immediately.
1227 stream.lock();
1228 uint64_t idx = next_index++;
1229 streams[idx] = &stream;
1230 char buffer[16];
1231 write64ube(buffer, stream.timebase());
1232 auto r = stream.get_clusters();
1233 write32ube(buffer + 8, r.first);
1234 write32ube(buffer + 12, r.second);
1235 uint64_t entry_number = 0;
1236 if(free_indices.empty())
1237 entry_number = next_stream++;
1238 else {
1239 entry_number = *free_indices.begin();
1240 free_indices.erase(entry_number);
1242 uint32_t write_cluster = 2;
1243 uint32_t write_offset = 0;
1244 uint32_t dummy1, dummy2;
1245 fs.skip_data(write_cluster, write_offset, 16 * entry_number);
1246 fs.write_data(write_cluster, write_offset, buffer, 16, dummy1, dummy2);
1247 streams_by_time.insert(std::make_pair(stream.timebase(), idx));
1248 entries[idx] = entry_number;
1249 return idx;
1250 } catch(std::exception& e) {
1251 (stringfmt() << "Failed to add stream: " << e.what()).throwex();
1255 void stream_collection::unlock_all()
1257 umutex_class m(mutex);
1258 for(auto i : streams)
1259 i.second->unlock();
1262 void stream_collection::delete_stream(uint64_t index)
1264 umutex_class m(mutex);
1265 if(!entries.count(index))
1266 return;
1267 uint64_t entry_number = entries[index];
1268 uint32_t write_cluster = 2;
1269 uint32_t write_offset = 0;
1270 uint32_t dummy1, dummy2;
1271 char buffer[16] = {0};
1272 fs.skip_data(write_cluster, write_offset, 16 * entry_number);
1273 fs.write_data(write_cluster, write_offset, buffer, 16, dummy1, dummy2);
1274 auto itr = streams_by_time.lower_bound(streams[index]->timebase());
1275 auto itr2 = streams_by_time.upper_bound(streams[index]->timebase());
1276 for(auto x = itr; x != itr2; x++)
1277 if(x->second == index) {
1278 streams_by_time.erase(x);
1279 break;
1281 streams[index]->delete_stream();
1282 streams.erase(index);
1285 void stream_collection::alter_stream_timebase(uint64_t index, uint64_t newts)
1287 try {
1288 umutex_class m(mutex);
1289 if(!streams.count(index))
1290 return;
1291 if(entries.count(index)) {
1292 char buffer[8];
1293 uint32_t write_cluster = 2;
1294 uint32_t write_offset = 0;
1295 uint32_t dummy1, dummy2;
1296 write64ube(buffer, newts);
1297 fs.skip_data(write_cluster, write_offset, 16 * entries[index]);
1298 fs.write_data(write_cluster, write_offset, buffer, 8, dummy1, dummy2);
1300 auto itr = streams_by_time.lower_bound(streams[index]->timebase());
1301 auto itr2 = streams_by_time.upper_bound(streams[index]->timebase());
1302 for(auto x = itr; x != itr2; x++)
1303 if(x->second == index) {
1304 streams_by_time.erase(x);
1305 break;
1307 streams[index]->timebase(newts);
1308 streams_by_time.insert(std::make_pair(newts, index));
1309 } catch(std::exception& e) {
1310 (stringfmt() << "Failed to alter stream timebase: " << e.what()).throwex();
1314 std::list<uint64_t> stream_collection::all_streams()
1316 umutex_class m(mutex);
1317 std::list<uint64_t> s;
1318 for(auto i : streams_by_time)
1319 s.push_back(i.second);
1320 return s;
1323 void stream_collection::export_superstream(std::ofstream& out)
1325 std::list<uint64_t> slist = all_streams();
1326 //Find the total length of superstream.
1327 uint64_t len = 0;
1328 for(auto i : slist) {
1329 opus_stream* s = get_stream(i);
1330 if(s) {
1331 len = max(len, s->timebase() + s->length());
1332 s->put_ref();
1335 char header[32];
1336 write64ule(header, 0x1C586F532EULL); //Magic and header size.
1337 write64ule(header + 8, len);
1338 write64ule(header + 16, 4676829883349860352ULL); //Sampling rate.
1339 write64ule(header + 24, 1);
1340 out.write(header, 32);
1341 if(!out)
1342 throw std::runtime_error("Error writing PCM output");
1344 //Find the first valid stream.
1345 auto next_i = slist.begin();
1346 opus_stream* next_stream = NULL;
1347 while(next_i != slist.end()) {
1348 next_stream = get_stream(*next_i);
1349 next_i++;
1350 if(next_stream)
1351 break;
1353 uint64_t next_ts;
1354 next_ts = next_stream ? next_stream->timebase() : len;
1356 std::list<opus_playback_stream*> active;
1357 try {
1358 for(uint64_t s = 0; s < len;) {
1359 if(s == next_ts) {
1360 active.push_back(new opus_playback_stream(*next_stream));
1361 next_stream->put_ref();
1362 next_stream = NULL;
1363 while(next_i != slist.end()) {
1364 next_stream = get_stream(*next_i);
1365 next_i++;
1366 if(!next_stream)
1367 continue;
1368 uint64_t next_ts = next_stream->timebase();
1369 if(next_ts > s)
1370 break;
1371 //Okay, this starts too...
1372 active.push_back(new opus_playback_stream(*next_stream));
1373 next_stream->put_ref();
1374 next_stream = NULL;
1376 next_ts = next_stream ? next_stream->timebase() : len;
1378 uint64_t maxsamples = min(next_ts - s, static_cast<uint64_t>(OUTPUT_BLOCK));
1379 maxsamples = min(maxsamples, len - s);
1380 char outbuf[4 * OUTPUT_BLOCK];
1381 float buf1[OUTPUT_BLOCK];
1382 float buf2[OUTPUT_BLOCK];
1383 for(size_t t = 0; t < maxsamples; t++)
1384 buf1[t] = 0;
1385 for(auto t : active) {
1386 t->read(buf2, maxsamples);
1387 for(size_t u = 0; u < maxsamples; u++)
1388 buf1[u] += buf2[u];
1390 for(auto t = active.begin(); t != active.end();) {
1391 if((*t)->eof()) {
1392 auto todel = t;
1393 t++;
1394 delete *todel;
1395 active.erase(todel);
1396 } else
1397 t++;
1399 for(size_t t = 0; t < maxsamples; t++)
1400 write32sle(outbuf + 4 * t, buf1[t] * 268435456);
1401 out.write(outbuf, 4 * maxsamples);
1402 if(!out)
1403 throw std::runtime_error("Failed to write PCM");
1404 s += maxsamples;
1406 } catch(std::exception& e) {
1407 (stringfmt() << "Failed to export PCM: " << e.what()).throwex();
1409 for(auto t = active.begin(); t != active.end();) {
1410 if((*t)->eof()) {
1411 auto todelete = t;
1412 t++;
1413 delete *todelete;
1414 active.erase(todelete);
1415 } else
1416 t++;
1420 ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1421 void start_management_stream(opus_stream& s)
1423 opus_playback_stream* p = new opus_playback_stream(s);
1424 umutex_class m(active_playback_streams_lock);
1425 active_playback_streams.push_back(p);
1428 void advance_time(uint64_t newtime)
1430 umutex_class m2(current_collection_lock);
1431 if(!current_collection) {
1432 //Clear all.
1433 umutex_class m(active_playback_streams_lock);
1434 for(auto i : active_playback_streams)
1435 delete i;
1436 active_playback_streams.clear();
1437 return;
1439 std::list<uint64_t> sactive = current_collection->streams_at(newtime);
1440 for(auto j : sactive) {
1441 opus_stream* i = current_collection->get_stream(j);
1442 if(!i)
1443 continue;
1444 //Don't play locked streams in order to avoid double playing.
1445 umutex_class m(active_playback_streams_lock);
1446 try {
1447 if(!i->islocked())
1448 active_playback_streams.push_back(new opus_playback_stream(*i));
1449 } catch(std::exception& e) {
1450 messages << "Can't start stream: " << e.what() << std::endl;
1452 i->put_ref();
1456 void jump_time(uint64_t newtime)
1458 umutex_class m2(current_collection_lock);
1459 if(!current_collection) {
1460 //Clear all.
1461 umutex_class m(active_playback_streams_lock);
1462 for(auto i : active_playback_streams)
1463 delete i;
1464 active_playback_streams.clear();
1465 return;
1467 //Close all currently playing streams.
1469 umutex_class m(active_playback_streams_lock);
1470 for(auto i : active_playback_streams)
1471 delete i;
1472 active_playback_streams.clear();
1474 //Unlock all streams, so they will play.
1475 current_collection->unlock_all();
1476 //Reopen all streams that should be open (with seeking)
1477 std::list<uint64_t> sactive = current_collection->streams_at(newtime);
1478 for(auto j : sactive) {
1479 opus_stream* i = current_collection->get_stream(j);
1480 if(!i)
1481 continue;
1482 //No need to check for locks, because we just busted all of those.
1483 uint64_t p = newtime - i->timebase();
1484 opus_playback_stream* s;
1485 try {
1486 s = new opus_playback_stream(*i);
1487 } catch(std::exception& e) {
1488 messages << "Can't start stream: " << e.what() << std::endl;
1490 i->put_ref();
1491 if(!s)
1492 continue;
1493 s->skip(p);
1494 umutex_class m(active_playback_streams_lock);
1495 active_playback_streams.push_back(s);
1499 //Resample.
1500 void do_resample(audioapi_resampler& r, float* srcbuf, size_t& srcuse, float* dstbuf, size_t& dstuse,
1501 size_t dstmax, double ratio)
1503 if(srcuse == 0 || dstuse >= dstmax)
1504 return;
1505 float* in = srcbuf;
1506 size_t in_u = srcuse;
1507 float* out = dstbuf + dstuse;
1508 size_t out_u = dstmax - dstuse;
1509 r.resample(in, in_u, out, out_u, ratio, false);
1510 size_t offset = in - srcbuf;
1511 if(offset < srcuse)
1512 memmove(srcbuf, srcbuf + offset, sizeof(float) * (srcuse - offset));
1513 srcuse -= offset;
1514 dstuse = dstmax - out_u;
1517 //Drain the input buffer.
1518 void drain_input()
1520 while(audioapi_voice_r_status() > 0) {
1521 float buf[256];
1522 unsigned size = min(audioapi_voice_r_status(), 256u);
1523 audioapi_record_voice(buf, size);
1527 //Read the input buffer.
1528 void read_input(float* buf, size_t& use, size_t maxuse)
1530 size_t rleft = audioapi_voice_r_status();
1531 unsigned toread = min(rleft, max(maxuse, use) - use);
1532 if(toread > 0) {
1533 audioapi_record_voice(buf + use, toread);
1534 use += toread;
1538 //Compress Opus block.
1539 void compress_opus_block(OpusEncoder* e, float* buf, size_t& use, opus_stream& active_stream,
1540 double& total_compressed, double& total_blocks)
1542 const size_t opus_out_max = 1276;
1543 unsigned char opus_output[opus_out_max];
1544 size_t cblock = 0;
1545 if(use >= 960)
1546 cblock = 960;
1547 else if(use >= 480)
1548 cblock = 480;
1549 else if(use >= 240)
1550 cblock = 240;
1551 else if(use >= 120)
1552 cblock = 120;
1553 else
1554 return; //No valid data to compress.
1556 int c = opus_encode_float(e, buf, cblock, opus_output, opus_out_max);
1557 if(c > 0) {
1558 //Successfully compressed a block.
1559 size_t opus_output_len = c;
1560 total_compressed += c;
1561 total_blocks++;
1562 try {
1563 active_stream.write(cblock / 120, opus_output, opus_output_len);
1564 } catch(std::exception& e) {
1565 messages << "Error writing data: " << e.what() << std::endl;
1567 } else
1568 messages << "Error from Opus encoder: " << opus_strerror(c) << std::endl;
1569 use -= cblock;
1572 void update_time()
1574 uint64_t sampletime;
1575 bool jumping;
1577 umutex_class m(time_mutex);
1578 sampletime = current_time;
1579 jumping = time_jump;
1580 time_jump = false;
1582 if(jumping)
1583 jump_time(sampletime);
1584 else
1585 advance_time(sampletime);
1588 void decompress_active_streams(float* out, size_t& use)
1590 size_t base = use;
1591 use += OUTPUT_BLOCK;
1592 for(unsigned i = 0; i < OUTPUT_BLOCK; i++)
1593 out[i + base] = 0;
1594 //Do it this way to minimize the amount of time playback streams lock
1595 //is held.
1596 std::list<opus_playback_stream*> stmp;
1598 umutex_class m(active_playback_streams_lock);
1599 stmp = active_playback_streams;
1601 std::set<opus_playback_stream*> toerase;
1602 for(auto i : stmp) {
1603 float tmp[OUTPUT_BLOCK];
1604 try {
1605 i->read(tmp, OUTPUT_BLOCK);
1606 } catch(std::exception& e) {
1607 messages << "Failed to decompress: " << e.what() << std::endl;
1608 for(unsigned j = 0; j < OUTPUT_BLOCK; j++)
1609 tmp[j] = 0;
1611 for(unsigned j = 0; j < OUTPUT_BLOCK; j++)
1612 out[j + base] += tmp[j];
1613 if(i->eof())
1614 toerase.insert(i);
1617 umutex_class m(active_playback_streams_lock);
1618 for(auto i = active_playback_streams.begin(); i != active_playback_streams.end();) {
1619 if(toerase.count(*i)) {
1620 auto toerase = i;
1621 i++;
1622 delete *toerase;
1623 active_playback_streams.erase(toerase);
1624 } else
1625 i++;
1630 void handle_tangent_positive_edge(OpusEncoder* e, opus_stream*& active_stream,
1631 double& total_compressed, double& total_blocks)
1633 umutex_class m2(current_collection_lock);
1634 if(!current_collection)
1635 return;
1636 static unsigned output_seq = 0;
1637 opus_encoder_ctl(e, OPUS_RESET_STATE);
1638 total_compressed = 0;
1639 total_blocks = 0;
1640 uint64_t ctime;
1642 umutex_class m(time_mutex);
1643 ctime = current_time;
1645 active_stream = NULL;
1646 try {
1647 active_stream = new opus_stream(ctime, current_collection->get_filesystem());
1648 int pregap;
1649 opus_encoder_ctl(e, OPUS_GET_LOOKAHEAD(&pregap));
1650 active_stream->set_pregap(pregap);
1651 } catch(std::exception& e) {
1652 messages << "Can't start stream: " << e.what() << std::endl;
1653 return;
1655 messages << "Tangent positive edge." << std::endl;
1658 void handle_tangent_negative_edge(opus_stream*& active_stream, double total_compressed,
1659 double total_blocks)
1661 umutex_class m2(current_collection_lock);
1662 messages << "Tangent negative edge. "
1663 << total_compressed << " bytes in " << total_blocks << " blocks, "
1664 << (0.4 * total_compressed / total_blocks) << " kbps" << std::endl;
1665 active_stream->write_trailier();
1666 if(current_collection) {
1667 try {
1668 current_collection->add_stream(*active_stream);
1669 } catch(std::exception& e) {
1670 messages << "Can't add stream: " << e.what() << std::endl;
1671 active_stream->put_ref();
1673 information_dispatch::do_voice_stream_change();
1674 } else
1675 active_stream->put_ref();
1676 active_stream = NULL;
1679 class inthread_th : public worker_thread
1681 public:
1682 inthread_th()
1684 rptr = 0;
1685 fire();
1687 void kill()
1689 quit = true;
1690 while(!quit_ack)
1691 usleep(100000);
1692 usleep(100000);
1694 protected:
1695 void entry()
1697 try {
1698 entry2();
1699 } catch(std::bad_alloc& e) {
1700 OOM_panic();
1701 } catch(std::exception& e) {
1702 messages << "AIEEE... Fatal exception in voice thread: " << e.what() << std::endl;
1704 quit_ack = true;
1706 void entry2()
1708 const size_t f = sizeof(float);
1709 double position = 0;
1710 int err;
1711 OpusEncoder* oenc = opus_encoder_create(OPUS_SAMPLERATE, 1, OPUS_APPLICATION_VOIP, &err);
1712 opus_encoder_ctl(oenc, OPUS_SET_BITRATE(OPUS_BITRATE));
1713 audioapi_resampler rin;
1714 audioapi_resampler rout;
1715 const unsigned buf_max = 6144; //These buffers better be large.
1716 size_t buf_in_use = 0;
1717 size_t buf_inr_use = 0;
1718 size_t buf_outr_use = 0;
1719 size_t buf_out_use = 0;
1720 float buf_in[buf_max];
1721 float buf_inr[OPUS_BLOCK_SIZE];
1722 float buf_outr[OUTPUT_SIZE];
1723 float buf_out[buf_max];
1724 double total_compressed = 0;
1725 double total_blocks = 0;
1726 opus_stream* active_stream = NULL;
1728 drain_input();
1729 while(1) {
1730 uint64_t ticks = get_utime();
1731 //Handle tangent edgets.
1732 if(active_flag && !active_stream) {
1733 drain_input();
1734 buf_in_use = 0;
1735 buf_inr_use = 0;
1736 handle_tangent_positive_edge(oenc, active_stream, total_compressed,
1737 total_blocks);
1739 else if((!active_flag || quit) && active_stream)
1740 handle_tangent_negative_edge(active_stream, total_compressed, total_blocks);
1741 if(quit)
1742 break;
1744 //Read input, up to 25ms.
1745 unsigned rate = audioapi_voice_rate();
1746 size_t dbuf_max = min(buf_max, rate / REC_THRESHOLD_DIV);
1747 read_input(buf_in, buf_in_use, dbuf_max);
1749 //Resample up to full opus block.
1750 do_resample(rin, buf_in, buf_in_use, buf_inr, buf_inr_use, OPUS_BLOCK_SIZE,
1751 1.0 * OPUS_SAMPLERATE / rate);
1753 //If we have full opus block and recording is enabled, compress it.
1754 if(buf_inr_use >= OPUS_BLOCK_SIZE && active_stream)
1755 compress_opus_block(oenc, buf_inr, buf_inr_use, *active_stream,
1756 total_compressed, total_blocks);
1758 //Update time, starting/ending streams.
1759 update_time();
1761 //Decompress active streams.
1762 if(buf_outr_use < BLOCK_THRESHOLD)
1763 decompress_active_streams(buf_outr, buf_outr_use);
1765 //Resample to output rate.
1766 do_resample(rout, buf_outr, buf_outr_use, buf_out, buf_out_use, buf_max,
1767 1.0 * rate / OPUS_SAMPLERATE);
1769 //Output stuff.
1770 if(buf_out_use > 0 && audioapi_voice_p_status2() < rate / PLAY_THRESHOLD_DIV) {
1771 audioapi_play_voice(buf_out, buf_out_use);
1772 buf_out_use = 0;
1775 //Sleep a bit to save CPU use.
1776 uint64_t ticks_spent = get_utime() - ticks;
1777 if(ticks_spent < ITERATION_TIME)
1778 usleep(ITERATION_TIME - ticks_spent);
1780 opus_encoder_destroy(oenc);
1781 delete current_collection;
1783 private:
1784 size_t rptr;
1785 double position;
1786 volatile bool quit;
1787 volatile bool quit_ack;
1790 //The tangent function.
1791 function_ptr_command<> ptangent("+tangent", "Voice tangent",
1792 "Syntax: +tangent\nVoice tangent.\n",
1793 []() throw(std::bad_alloc, std::runtime_error) {
1794 active_flag = true;
1796 function_ptr_command<> ntangent("-tangent", "Voice tangent",
1797 "Syntax: -tangent\nVoice tangent.\n",
1798 []() throw(std::bad_alloc, std::runtime_error) {
1799 active_flag = false;
1802 inthread_th* int_task;
1806 void voice_frame_number(uint64_t newframe, double rate)
1808 if(rate == last_rate && last_frame_number == newframe)
1809 return;
1810 umutex_class m(time_mutex);
1811 current_time = newframe / rate * OPUS_SAMPLERATE;
1812 if(fabs(rate - last_rate) > 1e-6 || last_frame_number + 1 != newframe)
1813 time_jump = true;
1814 last_frame_number = newframe;
1815 last_rate = rate;
1818 void voicethread_task()
1820 int_task = new inthread_th;
1823 void voicethread_kill()
1825 int_task->kill();
1826 int_task = NULL;
1829 uint64_t voicesub_parse_timebase(const std::string& n)
1831 std::string x = n;
1832 if(x.length() > 0 && x[x.length() - 1] == 's') {
1833 x = x.substr(0, x.length() - 1);
1834 return 48000 * parse_value<double>(x);
1835 } else
1836 return parse_value<uint64_t>(x);
1839 namespace
1841 function_ptr_command<> list_streams("list-streams", "List streams ", "list-streams\nList known voice streams",
1842 []() throw(std::bad_alloc, std::runtime_error) {
1843 umutex_class m2(current_collection_lock);
1844 if(!current_collection) {
1845 messages << "No voice streams loaded." << std::endl;
1846 return;
1848 messages << "-----------------------" << std::endl;
1849 for(auto i : current_collection->all_streams()) {
1850 opus_stream* s = current_collection->get_stream(i);
1851 if(!s)
1852 continue;
1853 messages << "ID #" << i << ": base=" << s->timebase() << " ("
1854 << (s->timebase() / 48000.0) << "s), length=" << s->length() << " ("
1855 << (s->length() / 48000.0) << "s)" << std::endl;
1856 s->put_ref();
1858 messages << "-----------------------" << std::endl;
1861 function_ptr_command<const std::string&> delete_stream("delete-stream", "Delete a stream",
1862 "delete-stream <id>\nDelete a voice stream with given ID.",
1863 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
1864 umutex_class m2(current_collection_lock);
1865 uint64_t id = parse_value<uint64_t>(x);
1866 if(!current_collection) {
1867 messages << "No voice streams loaded." << std::endl;
1868 return;
1870 opus_stream* s = current_collection->get_stream(id);
1871 if(!s) {
1872 messages << "Error, no such stream found." << std::endl;
1873 return;
1875 s->put_ref();
1876 current_collection->delete_stream(id);
1877 information_dispatch::do_voice_stream_change();
1878 messages << "Deleted stream #" << id << "." << std::endl;
1881 function_ptr_command<const std::string&> play_stream("play-stream", "Play a stream", "play-stream <id>\n"
1882 "Play a voice stream with given ID.",
1883 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
1884 umutex_class m2(current_collection_lock);
1885 uint64_t id = parse_value<uint64_t>(x);
1886 if(!current_collection) {
1887 messages << "No voice streams loaded." << std::endl;
1888 return;
1890 opus_stream* s = current_collection->get_stream(id);
1891 if(!s) {
1892 messages << "Error, no such stream found." << std::endl;
1893 return;
1895 try {
1896 start_management_stream(*s);
1897 } catch(...) {
1898 s->put_ref();
1899 throw;
1901 s->put_ref();
1902 messages << "Playing stream #" << id << "." << std::endl;
1905 function_ptr_command<const std::string&> change_timebase("change-timebase", "Change stream timebase",
1906 "change-timebase <id> <newbase>\nChange timebase of given stream",
1907 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
1908 umutex_class m2(current_collection_lock);
1909 if(!current_collection) {
1910 messages << "No voice streams loaded." << std::endl;
1911 return;
1913 auto r = regex("([0-9]+)[ \t]+([^ \t]*)", x);
1914 if(!r) {
1915 messages << "Syntax: change-timebase <id> <timebase>" << std::endl;
1916 return;
1918 uint64_t id = parse_value<uint64_t>(r[1]);
1919 uint64_t tbase = voicesub_parse_timebase(r[2]);
1920 opus_stream* s = current_collection->get_stream(id);
1921 if(!s) {
1922 messages << "Error, no such stream found." << std::endl;
1923 return;
1925 s->put_ref();
1926 current_collection->alter_stream_timebase(id, tbase);
1927 information_dispatch::do_voice_stream_change();
1928 messages << "Timebase of stream #" << id << " is now " << (tbase / 48000.0) << "s"
1929 << std::endl;
1932 void import_cmd_common(const std::string& x, const char* postfix, external_stream_format mode)
1934 umutex_class m2(current_collection_lock);
1935 if(!current_collection) {
1936 messages << "No voice streams loaded." << std::endl;
1937 return;
1939 auto r = regex("([^ \t]+)[ \t]+(.+)", x);
1940 if(!r) {
1941 messages << "Syntax: import-stream-" << postfix << " <timebase> <filename>" << std::endl;
1942 return;
1944 uint64_t tbase = voicesub_parse_timebase(r[1]);
1945 std::string fname = r[2];
1946 std::ifstream s(fname, std::ios_base::in | std::ios_base::binary);
1947 if(!s) {
1948 messages << "Can't open '" << fname << "'" << std::endl;
1949 return;
1951 opus_stream* st = new opus_stream(tbase, current_collection->get_filesystem(), s, mode);
1952 uint64_t id;
1953 try {
1954 id = current_collection->add_stream(*st);
1955 } catch(...) {
1956 st->delete_stream();
1957 throw;
1959 st->unlock(); //Not locked.
1960 information_dispatch::do_voice_stream_change();
1961 messages << "Imported stream (" << st->length() / 48000.0 << "s) as ID #" << id << std::endl;
1964 function_ptr_command<const std::string&> import_stream_c("import-stream-opus", "Import a opus stream",
1965 "import-stream-opus <timebase> <filename>\nImport opus stream from <filename>, starting at "
1966 "<timebase>",
1967 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
1968 import_cmd_common(x, "opus", EXTFMT_OPUSDEMO);
1971 function_ptr_command<const std::string&> import_stream_p("import-stream-pcm", "Import a PCM stream",
1972 "import-stream-pcm <timebase> <filename>\nImport PCM stream from <filename>, starting at <timebase>",
1973 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
1974 import_cmd_common(x, "pcm", EXTFMT_SOX);
1977 function_ptr_command<const std::string&> import_stream_o("import-stream-ogg", "Import a OggOpus stream",
1978 "import-stream-ogg <timebase> <filename>\nImport OggOpus stream from <filename>, starting at "
1979 "<timebase>",
1980 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
1981 import_cmd_common(x, "ogg", EXTFMT_OGGOPUS);
1984 void export_cmd_common(const std::string& x, const char* postfix, external_stream_format mode)
1986 umutex_class m2(current_collection_lock);
1987 if(!current_collection) {
1988 messages << "No voice streams loaded." << std::endl;
1989 return;
1991 auto r = regex("([0-9]+)[ \t]+(.+)", x);
1992 if(!r) {
1993 messages << "Syntax: export-stream-" << postfix << " <id> <filename>" << std::endl;
1994 return;
1996 uint64_t id = parse_value<uint64_t>(r[1]);
1997 std::string fname = r[2];
1998 std::ofstream s(fname, std::ios_base::out | std::ios_base::binary);
1999 if(!s) {
2000 messages << "Can't open '" << fname << "'" << std::endl;
2001 return;
2003 opus_stream* st = current_collection->get_stream(id);
2004 if(!st) {
2005 messages << "Error, stream #" << id << " does not exist." << std::endl;
2006 return;
2008 try {
2009 st->export_stream(s, mode);
2010 messages << "Exported stream #" << id << " (" << st->length() / 48000.0 << "s)" << std::endl;
2011 } catch(std::exception& e) {
2012 messages << "Export failed: " << e.what();
2014 st->put_ref();
2017 function_ptr_command<const std::string&> export_stream_c("export-stream-opus", "Export a opus stream",
2018 "export-stream-opus <id> <filename>\nExport opus stream <id> to <filename>",
2019 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
2020 export_cmd_common(x, "opus", EXTFMT_OPUSDEMO);
2023 function_ptr_command<const std::string&> export_stream_p("export-stream-pcm", "Export a PCM stream",
2024 "export-stream-pcm <id> <filename>\nExport PCM stream <id> to <filename>",
2025 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
2026 export_cmd_common(x, "pcm", EXTFMT_SOX);
2029 function_ptr_command<const std::string&> export_stream_o("export-stream-ogg", "Export a OggOpus stream",
2030 "export-stream-ogg <id> <filename>\nExport OggOpus stream <id> to <filename>",
2031 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
2032 export_cmd_common(x, "ogg", EXTFMT_OGGOPUS);
2035 function_ptr_command<const std::string&> export_sstream("export-superstream", "Export superstream",
2036 "export-superstream <filename>\nExport PCM superstream to <filename>",
2037 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
2038 umutex_class m2(current_collection_lock);
2039 if(!current_collection)
2040 return;
2041 std::ofstream s(x, std::ios_base::out | std::ios_base::binary);
2042 if(!s) {
2043 messages << "Can't open '" << x << "'" << std::endl;
2044 return;
2046 current_collection->export_superstream(s);
2047 messages << "Superstream exported." << std::endl;
2050 function_ptr_command<const std::string&> load_collection("load-collection", "Load voice subtitling "
2051 "collection", "load-collection <filename>\nLoad voice subtitling collection from <filename>",
2052 [](const std::string& x) throw(std::bad_alloc, std::runtime_error) {
2053 umutex_class m2(current_collection_lock);
2054 filesystem_ref newfs;
2055 stream_collection* newc;
2056 try {
2057 newfs = filesystem_ref(x);
2058 newc = new stream_collection(newfs);
2059 } catch(std::exception& e) {
2060 messages << "Can't load '" << x << "': " << e.what();
2061 return;
2063 if(current_collection)
2064 delete current_collection;
2065 current_collection = newc;
2066 information_dispatch::do_voice_stream_change();
2067 messages << "Loaded '" << x << "'" << std::endl;
2070 function_ptr_command<> unload_collection("unload-collection", "Unload voice subtitling collection",
2071 "unload-collection\nUnload voice subtitling collection",
2072 []() throw(std::bad_alloc, std::runtime_error) {
2073 umutex_class m2(current_collection_lock);
2074 if(current_collection)
2075 delete current_collection;
2076 current_collection = NULL;
2077 information_dispatch::do_voice_stream_change();
2078 messages << "Collection unloaded" << std::endl;
2081 inverse_key itangent("+tangent", "Movie‣Voice tangent");
2084 bool voicesub_collection_loaded()
2086 umutex_class m2(current_collection_lock);
2087 return (current_collection != NULL);
2090 std::list<playback_stream_info> voicesub_get_stream_info()
2092 umutex_class m2(current_collection_lock);
2093 std::list<playback_stream_info> in;
2094 if(!current_collection)
2095 return in;
2096 for(auto i : current_collection->all_streams()) {
2097 opus_stream* s = current_collection->get_stream(i);
2098 playback_stream_info pi;
2099 if(!s)
2100 continue;
2101 pi.id = i;
2102 pi.base = s->timebase();
2103 pi.length = s->length();
2104 try {
2105 in.push_back(pi);
2106 } catch(...) {
2108 s->put_ref();
2110 return in;
2113 void voicesub_play_stream(uint64_t id)
2115 umutex_class m2(current_collection_lock);
2116 if(!current_collection)
2117 throw std::runtime_error("No collection loaded");
2118 opus_stream* s = current_collection->get_stream(id);
2119 if(!s)
2120 return;
2121 try {
2122 start_management_stream(*s);
2123 } catch(...) {
2124 s->put_ref();
2125 throw;
2127 s->put_ref();
2130 void voicesub_export_stream(uint64_t id, const std::string& filename, external_stream_format fmt)
2132 umutex_class m2(current_collection_lock);
2133 if(!current_collection)
2134 throw std::runtime_error("No collection loaded");
2135 opus_stream* st = current_collection->get_stream(id);
2136 if(!st)
2137 return;
2138 std::ofstream s(filename, std::ios_base::out | std::ios_base::binary);
2139 if(!s) {
2140 st->put_ref();
2141 throw std::runtime_error("Can't open output file");
2143 try {
2144 st->export_stream(s, fmt);
2145 } catch(std::exception& e) {
2146 st->put_ref();
2147 (stringfmt() << "Export failed: " << e.what()).throwex();
2149 st->put_ref();
2152 uint64_t voicesub_import_stream(uint64_t ts, const std::string& filename, external_stream_format fmt)
2154 umutex_class m2(current_collection_lock);
2155 if(!current_collection)
2156 throw std::runtime_error("No collection loaded");
2158 std::ifstream s(filename, std::ios_base::in | std::ios_base::binary);
2159 if(!s)
2160 throw std::runtime_error("Can't open input file");
2161 opus_stream* st = new opus_stream(ts, current_collection->get_filesystem(), s, fmt);
2162 uint64_t id;
2163 try {
2164 id = current_collection->add_stream(*st);
2165 } catch(...) {
2166 st->delete_stream();
2167 throw;
2169 st->unlock(); //Not locked.
2170 information_dispatch::do_voice_stream_change();
2171 return id;
2174 void voicesub_delete_stream(uint64_t id)
2176 umutex_class m2(current_collection_lock);
2177 if(!current_collection)
2178 throw std::runtime_error("No collection loaded");
2179 current_collection->delete_stream(id);
2180 information_dispatch::do_voice_stream_change();
2183 void voicesub_export_superstream(const std::string& filename)
2185 umutex_class m2(current_collection_lock);
2186 if(!current_collection)
2187 throw std::runtime_error("No collection loaded");
2188 std::ofstream s(filename, std::ios_base::out | std::ios_base::binary);
2189 if(!s)
2190 throw std::runtime_error("Can't open output file");
2191 current_collection->export_superstream(s);
2194 void voicesub_load_collection(const std::string& filename)
2196 umutex_class m2(current_collection_lock);
2197 filesystem_ref newfs;
2198 stream_collection* newc;
2199 newfs = filesystem_ref(filename);
2200 newc = new stream_collection(newfs);
2201 if(current_collection)
2202 delete current_collection;
2203 current_collection = newc;
2204 information_dispatch::do_voice_stream_change();
2207 void voicesub_unload_collection()
2209 umutex_class m2(current_collection_lock);
2210 if(current_collection)
2211 delete current_collection;
2212 current_collection = NULL;
2213 information_dispatch::do_voice_stream_change();
2216 void voicesub_alter_timebase(uint64_t id, uint64_t ts)
2218 umutex_class m2(current_collection_lock);
2219 if(!current_collection)
2220 throw std::runtime_error("No collection loaded");
2221 current_collection->alter_stream_timebase(id, ts);
2222 information_dispatch::do_voice_stream_change();
2225 double voicesub_ts_seconds(uint64_t ts)
2227 return ts / 48000.0;
2229 #else
2230 void voicethread_task() {}
2231 void voice_frame_number(uint64_t newframe, double rate) {}
2232 void voicethread_kill() {}
2233 #endif