1 /***************************************************************************
3 * Open \______ \ ____ ____ | | _\_ |__ _______ ___
4 * Source | _// _ \_/ ___\| |/ /| __ \ / _ \ \/ /
5 * Jukebox | | ( <_> ) \___| < | \_\ ( <_> > < <
6 * Firmware |____|_ /\____/ \___ >__|_ \|___ /\____/__/\_ \
10 * mpegplayer buffering routines
12 * Copyright (c) 2007 Michael Sevakis
14 * This program is free software; you can redistribute it and/or
15 * modify it under the terms of the GNU General Public License
16 * as published by the Free Software Foundation; either version 2
17 * of the License, or (at your option) any later version.
19 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 * KIND, either express or implied.
22 ****************************************************************************/
24 #include "mpegplayer.h"
26 static struct mutex disk_buf_mtx SHAREDBSS_ATTR
;
27 static struct event_queue disk_buf_queue SHAREDBSS_ATTR
;
28 static struct queue_sender_list disk_buf_queue_send SHAREDBSS_ATTR
;
29 static uint32_t disk_buf_stack
[DEFAULT_STACK_SIZE
*2/sizeof(uint32_t)];
31 struct disk_buf disk_buf SHAREDBSS_ATTR
;
32 static struct list_item nf_list
;
34 static inline void disk_buf_lock(void)
36 rb
->mutex_lock(&disk_buf_mtx
);
39 static inline void disk_buf_unlock(void)
41 rb
->mutex_unlock(&disk_buf_mtx
);
44 static inline void disk_buf_on_clear_data_notify(struct stream_hdr
*sh
)
46 DEBUGF("DISK_BUF_CLEAR_DATA_NOTIFY: 0x%02X (cleared)\n",
47 STR_FROM_HEADER(sh
)->id
);
48 list_remove_item(&sh
->nf
);
51 static int disk_buf_on_data_notify(struct stream_hdr
*sh
)
53 DEBUGF("DISK_BUF_DATA_NOTIFY: 0x%02X ", STR_FROM_HEADER(sh
)->id
);
55 if (sh
->win_left
<= sh
->win_right
)
57 /* Check if the data is already ready already */
58 if (disk_buf_is_data_ready(sh
, 0))
60 /* It was - don't register */
61 DEBUGF("(was ready)\n"
64 sh
->win_left
, sh
->win_right
,
65 disk_buf
.win_left
, disk_buf
.win_right
);
66 /* Be sure it's not listed though if multiple requests were made */
67 list_remove_item(&sh
->nf
);
68 return DISK_BUF_NOTIFY_OK
;
71 switch (disk_buf
.state
)
74 case TSTATE_BUFFERING
:
76 disk_buf
.state
= TSTATE_BUFFERING
;
77 list_add_item(&nf_list
, &sh
->nf
);
78 DEBUGF("(registered)\n"
81 sh
->win_left
, sh
->win_right
,
82 disk_buf
.win_left
, disk_buf
.win_right
);
83 return DISK_BUF_NOTIFY_REGISTERED
;
88 return DISK_BUF_NOTIFY_ERROR
;
91 static bool check_data_notifies_callback(struct list_item
*item
,
94 struct stream_hdr
*sh
= TYPE_FROM_MEMBER(struct stream_hdr
, item
, nf
);
96 if (disk_buf_is_data_ready(sh
, 0))
98 /* Remove from list then post notification - post because send
99 * could result in a wait for each thread to finish resulting
101 list_remove_item(item
);
102 str_post_msg(STR_FROM_HEADER(sh
), DISK_BUF_DATA_NOTIFY
, 0);
103 DEBUGF("DISK_BUF_DATA_NOTIFY: 0x%02X (notified)\n",
104 STR_FROM_HEADER(sh
)->id
);
111 /* Check registered streams and notify them if their data is available */
112 static void check_data_notifies(void)
114 list_enum_items(&nf_list
, check_data_notifies_callback
, 0);
117 /* Clear all registered notifications - do not post them */
118 static inline void clear_data_notifies(void)
120 list_clear_all(&nf_list
);
123 /* Background buffering when streaming */
124 static inline void disk_buf_buffer(void)
126 struct stream_window sw
;
128 switch (disk_buf
.state
)
135 /* Get remaining minimum data based upon the stream closest to the
136 * right edge of the window */
137 if (!stream_get_window(&sw
))
140 time
= stream_get_ticks(NULL
);
141 wm
= muldiv_uint32(5*CLOCK_RATE
, sw
.right
- disk_buf
.pos_last
,
142 time
- disk_buf
.time_last
);
143 wm
= MIN(wm
, (size_t)disk_buf
.size
);
144 wm
= MAX(wm
, DISK_BUF_LOW_WATERMARK
);
146 disk_buf
.time_last
= time
;
147 disk_buf
.pos_last
= sw
.right
;
149 /* Fast attack, slow decay */
150 disk_buf
.low_wm
= (wm
> (size_t)disk_buf
.low_wm
) ?
151 wm
: AVERAGE(disk_buf
.low_wm
, wm
, 16);
154 rb
->splash(0, "*%10ld %10ld", disk_buf
.low_wm
,
155 disk_buf
.win_right
- sw
.right
);
158 if (disk_buf
.win_right
- sw
.right
> disk_buf
.low_wm
)
161 disk_buf
.state
= TSTATE_BUFFERING
;
165 case TSTATE_BUFFERING
:
168 uint32_t tag
, *tag_p
;
170 /* Limit buffering up to the stream with the least progress */
171 if (!stream_get_window(&sw
))
173 disk_buf
.state
= TSTATE_DATA
;
179 if (disk_buf
.tail
>= disk_buf
.end
)
180 disk_buf
.tail
= disk_buf
.start
;
182 len
= disk_buf
.size
- disk_buf
.win_right
+ sw
.left
;
184 if (len
< DISK_BUF_PAGE_SIZE
)
186 /* Free space is less than one page */
187 disk_buf
.state
= TSTATE_DATA
;
188 disk_buf
.low_wm
= DISK_BUF_LOW_WATERMARK
;
193 len
= disk_buf
.tail
- disk_buf
.start
;
194 tag
= MAP_OFFSET_TO_TAG(disk_buf
.win_right
);
195 tag_p
= &disk_buf
.cache
[len
>> DISK_BUF_PAGE_SHIFT
];
199 if (disk_buf
.need_seek
)
201 rb
->lseek(disk_buf
.in_file
, disk_buf
.win_right
, SEEK_SET
);
202 disk_buf
.need_seek
= false;
205 n
= rb
->read(disk_buf
.in_file
, disk_buf
.tail
, DISK_BUF_PAGE_SIZE
);
209 /* Error or end of stream */
210 disk_buf
.state
= TSTATE_EOS
;
215 if (len
< DISK_GUARDBUF_SIZE
)
217 /* Autoguard guard-o-rama - maintain guardbuffer coherency */
218 rb
->memcpy(disk_buf
.end
+ len
, disk_buf
.tail
,
219 MIN(DISK_GUARDBUF_SIZE
- len
, n
));
222 /* Update the cache entry for this page */
227 /* Skipping a read */
228 n
= MIN(DISK_BUF_PAGE_SIZE
,
229 disk_buf
.filesize
- disk_buf
.win_right
);
230 disk_buf
.need_seek
= true;
233 disk_buf
.tail
+= DISK_BUF_PAGE_SIZE
;
235 /* Keep left edge moving forward */
237 /* Advance right edge in temp variable first, then move
238 * left edge if overflow would occur. This avoids a stream
239 * thinking its data might be available when it actually
240 * may not end up that way after a slide of the window. */
241 len
= disk_buf
.win_right
+ n
;
243 if (len
- disk_buf
.win_left
> disk_buf
.size
)
244 disk_buf
.win_left
+= n
;
246 disk_buf
.win_right
= len
;
248 /* Continue buffering until filled or file end */
250 } /* TSTATE_BUFFERING: */
257 static void disk_buf_on_reset(ssize_t pos
)
263 disk_buf
.state
= TSTATE_INIT
;
264 disk_buf
.status
= STREAM_STOPPED
;
265 clear_data_notifies();
267 if (pos
>= disk_buf
.filesize
)
269 /* Anchor on page immediately following the one containing final
271 anchor
= disk_buf
.file_pages
* DISK_BUF_PAGE_SIZE
;
272 disk_buf
.win_left
= disk_buf
.filesize
;
276 anchor
= pos
& ~DISK_BUF_PAGE_MASK
;
277 disk_buf
.win_left
= anchor
;
280 /* Collect all valid data already buffered that is contiguous with the
281 * current position - probe to left, then to right */
284 page
= MAP_OFFSET_TO_PAGE(anchor
);
285 tag
= MAP_OFFSET_TO_TAG(anchor
);
289 if (--tag
, --page
< 0)
290 page
= disk_buf
.pgcount
- 1;
292 if (disk_buf
.cache
[page
] != tag
)
295 disk_buf
.win_left
= tag
<< DISK_BUF_PAGE_SHIFT
;
300 if (anchor
< disk_buf
.filesize
)
302 page
= MAP_OFFSET_TO_PAGE(anchor
);
303 tag
= MAP_OFFSET_TO_TAG(anchor
);
307 if (disk_buf
.cache
[page
] != tag
)
310 if (++tag
, ++page
>= disk_buf
.pgcount
)
313 anchor
+= DISK_BUF_PAGE_SIZE
;
315 while (anchor
< disk_buf
.filesize
);
318 if (anchor
>= disk_buf
.filesize
)
320 disk_buf
.win_right
= disk_buf
.filesize
;
321 disk_buf
.state
= TSTATE_EOS
;
325 disk_buf
.win_right
= anchor
;
328 disk_buf
.tail
= disk_buf
.start
+ MAP_OFFSET_TO_BUFFER(anchor
);
330 DEBUGF("disk buf reset\n"
331 " dwl:%ld dwr:%ld\n",
332 disk_buf
.win_left
, disk_buf
.win_right
);
334 /* Next read position is at right edge */
335 rb
->lseek(disk_buf
.in_file
, disk_buf
.win_right
, SEEK_SET
);
336 disk_buf
.need_seek
= false;
338 disk_buf_reply_msg(disk_buf
.win_right
- disk_buf
.win_left
);
341 static void disk_buf_on_stop(void)
343 bool was_buffering
= disk_buf
.state
== TSTATE_BUFFERING
;
345 disk_buf
.state
= TSTATE_EOS
;
346 disk_buf
.status
= STREAM_STOPPED
;
347 clear_data_notifies();
349 disk_buf_reply_msg(was_buffering
);
352 static void disk_buf_on_play_pause(bool play
, bool forcefill
)
354 struct stream_window sw
;
356 if (disk_buf
.state
!= TSTATE_EOS
)
360 /* Force buffer filling to top */
361 disk_buf
.state
= TSTATE_BUFFERING
;
363 else if (disk_buf
.state
!= TSTATE_BUFFERING
)
365 /* If not filling already, simply monitor */
366 disk_buf
.state
= TSTATE_DATA
;
369 /* else end of stream - no buffering to do */
371 disk_buf
.pos_last
= stream_get_window(&sw
) ? sw
.right
: 0;
372 disk_buf
.time_last
= stream_get_ticks(NULL
);
374 disk_buf
.status
= play
? STREAM_PLAYING
: STREAM_PAUSED
;
377 static int disk_buf_on_load_range(struct dbuf_range
*rng
)
379 uint32_t tag
= rng
->tag_start
;
380 uint32_t tag_end
= rng
->tag_end
;
381 int page
= rng
->pg_start
;
383 /* Check if a seek is required */
384 bool need_seek
= rb
->lseek(disk_buf
.in_file
, 0, SEEK_CUR
)
385 != (off_t
)(tag
<< DISK_BUF_PAGE_SHIFT
);
389 uint32_t *tag_p
= &disk_buf
.cache
[page
];
393 /* Page not cached - load it */
398 rb
->lseek(disk_buf
.in_file
, tag
<< DISK_BUF_PAGE_SHIFT
,
403 o
= page
<< DISK_BUF_PAGE_SHIFT
;
404 n
= rb
->read(disk_buf
.in_file
, disk_buf
.start
+ o
,
410 return DISK_BUF_NOTIFY_ERROR
;
419 if (o
< DISK_GUARDBUF_SIZE
)
421 /* Autoguard guard-o-rama - maintain guardbuffer coherency */
422 rb
->memcpy(disk_buf
.end
+ o
, disk_buf
.start
+ o
,
423 MIN(DISK_GUARDBUF_SIZE
- o
, n
));
426 /* Update the cache entry */
431 /* Skipping a disk read - must seek on next one */
435 if (++page
>= disk_buf
.pgcount
)
438 while (++tag
<= tag_end
);
440 return DISK_BUF_NOTIFY_OK
;
443 static void disk_buf_thread(void)
445 struct queue_event ev
;
447 disk_buf
.state
= TSTATE_EOS
;
448 disk_buf
.status
= STREAM_STOPPED
;
452 if (disk_buf
.state
!= TSTATE_EOS
)
454 /* Poll buffer status and messages */
455 rb
->queue_wait_w_tmo(disk_buf
.q
, &ev
,
456 disk_buf
.state
== TSTATE_BUFFERING
?
461 /* Sit idle and wait for commands */
462 rb
->queue_wait(disk_buf
.q
, &ev
);
468 if (disk_buf
.state
== TSTATE_EOS
)
473 /* Check for any due notifications if any are pending */
474 if (nf_list
.next
!= NULL
)
475 check_data_notifies();
477 /* Still more data left? */
478 if (disk_buf
.state
!= TSTATE_EOS
)
481 /* Nope - end of stream */
484 case DISK_BUF_CACHE_RANGE
:
485 disk_buf_reply_msg(disk_buf_on_load_range(
486 (struct dbuf_range
*)ev
.data
));
490 disk_buf_on_reset(ev
.data
);
499 disk_buf_on_play_pause(ev
.id
== STREAM_PLAY
, ev
.data
!= 0);
500 disk_buf_reply_msg(1);
504 disk_buf
.state
= TSTATE_EOS
;
507 case DISK_BUF_DATA_NOTIFY
:
508 disk_buf_reply_msg(disk_buf_on_data_notify(
509 (struct stream_hdr
*)ev
.data
));
512 case DISK_BUF_CLEAR_DATA_NOTIFY
:
513 disk_buf_on_clear_data_notify((struct stream_hdr
*)ev
.data
);
514 disk_buf_reply_msg(1);
520 /* Caches some data from the current file */
521 static int disk_buf_probe(off_t start
, size_t length
,
522 void **p
, size_t *outlen
)
525 uint32_t tag
, tag_end
;
528 /* Can't read past end of file */
529 if (length
> (size_t)(disk_buf
.filesize
- disk_buf
.offset
))
531 length
= disk_buf
.filesize
- disk_buf
.offset
;
534 /* Can't cache more than the whole buffer size */
535 if (length
> (size_t)disk_buf
.size
)
537 length
= disk_buf
.size
;
539 /* Zero-length probes permitted */
541 end
= start
+ length
;
543 /* Prepare the range probe */
544 tag
= MAP_OFFSET_TO_TAG(start
);
545 tag_end
= MAP_OFFSET_TO_TAG(end
);
546 page
= MAP_OFFSET_TO_PAGE(start
);
548 /* If the end is on a page boundary, check one less or an extra
549 * one will be probed */
550 if (tag_end
> tag
&& (end
& DISK_BUF_PAGE_MASK
) == 0)
557 *p
= disk_buf
.start
+ (page
<< DISK_BUF_PAGE_SHIFT
)
558 + (start
& DISK_BUF_PAGE_MASK
);
566 /* Obtain initial load point. If all data was cached, no message is sent
567 * otherwise begin on the first page that is not cached. Since we have to
568 * send the message anyway, the buffering thread will determine what else
569 * requires loading on its end in order to cache the specified range. */
572 if (disk_buf
.cache
[page
] != tag
)
574 static struct dbuf_range rng IBSS_ATTR
;
575 DEBUGF("disk_buf: cache miss\n");
577 rng
.tag_end
= tag_end
;
579 return rb
->queue_send(disk_buf
.q
, DISK_BUF_CACHE_RANGE
,
583 if (++page
>= disk_buf
.pgcount
)
586 while (++tag
<= tag_end
);
588 return DISK_BUF_NOTIFY_OK
;
591 /* Attempt to get a pointer to size bytes on the buffer. Returns real amount of
592 * data available as well as the size of non-wrapped data after *p. */
593 ssize_t
_disk_buf_getbuffer(size_t size
, void **pp
, void **pwrap
, size_t *sizewrap
)
597 if (disk_buf_probe(disk_buf
.offset
, size
, pp
, &size
) == DISK_BUF_NOTIFY_OK
)
599 if (pwrap
&& sizewrap
)
601 uint8_t *p
= (uint8_t *)*pp
;
603 if (p
+ size
> disk_buf
.end
+ DISK_GUARDBUF_SIZE
)
605 /* Return pointer to wraparound and the size of same */
606 size_t nowrap
= (disk_buf
.end
+ DISK_GUARDBUF_SIZE
) - p
;
607 *pwrap
= disk_buf
.start
+ DISK_GUARDBUF_SIZE
;
608 *sizewrap
= size
- nowrap
;
627 /* Read size bytes of data into a buffer - advances the buffer pointer
628 * and returns the real size read. */
629 ssize_t
disk_buf_read(void *buffer
, size_t size
)
635 if (disk_buf_probe(disk_buf
.offset
, size
, PUN_PTR(void **, &p
),
636 &size
) == DISK_BUF_NOTIFY_OK
)
638 if (p
+ size
> disk_buf
.end
+ DISK_GUARDBUF_SIZE
)
641 size_t nowrap
= (disk_buf
.end
+ DISK_GUARDBUF_SIZE
) - p
;
642 rb
->memcpy(buffer
, p
, nowrap
);
643 rb
->memcpy(buffer
+ nowrap
, disk_buf
.start
+ DISK_GUARDBUF_SIZE
,
648 /* Read wasn't wrapped or guardbuffer holds it */
649 rb
->memcpy(buffer
, p
, size
);
652 disk_buf
.offset
+= size
;
664 off_t
disk_buf_lseek(off_t offset
, int whence
)
668 /* The offset returned is the result of the current thread's action and
669 * may be invalidated so a local result is returned and not the value
670 * of disk_buf.offset directly */
674 /* offset is just the offset */
677 offset
+= disk_buf
.offset
;
680 offset
= disk_buf
.filesize
+ offset
;
684 return -2; /* Invalid request */
687 if (offset
< 0 || offset
> disk_buf
.filesize
)
693 disk_buf
.offset
= offset
;
701 /* Prepare the buffer to enter the streaming state. Evaluates the available
702 * streaming window. */
703 ssize_t
disk_buf_prepare_streaming(off_t pos
, size_t len
)
709 else if (pos
> disk_buf
.filesize
)
710 pos
= disk_buf
.filesize
;
712 DEBUGF("prepare streaming:\n pos:%ld len:%lu\n", pos
, len
);
714 pos
= disk_buf_lseek(pos
, SEEK_SET
);
715 disk_buf_probe(pos
, len
, NULL
, &len
);
717 DEBUGF(" probe done: pos:%ld len:%lu\n", pos
, len
);
719 len
= disk_buf_send_msg(STREAM_RESET
, pos
);
726 /* Set the streaming window to an arbitrary position within the file. Makes no
727 * probes to validate data. Use after calling another function to cause data
728 * to be cached and correct values are known. */
729 ssize_t
disk_buf_set_streaming_window(off_t left
, off_t right
)
737 else if (left
> disk_buf
.filesize
)
738 left
= disk_buf
.filesize
;
743 if (right
> disk_buf
.filesize
)
744 right
= disk_buf
.filesize
;
746 disk_buf
.win_left
= left
;
747 disk_buf
.win_right
= right
;
748 disk_buf
.tail
= disk_buf
.start
+ ((right
+ DISK_BUF_PAGE_SIZE
-1) &
749 ~DISK_BUF_PAGE_MASK
) % disk_buf
.size
;
751 len
= disk_buf
.win_right
- disk_buf
.win_left
;
758 void * disk_buf_offset2ptr(off_t offset
)
762 else if (offset
> disk_buf
.filesize
)
763 offset
= disk_buf
.filesize
;
765 return disk_buf
.start
+ (offset
% disk_buf
.size
);
768 void disk_buf_close(void)
772 if (disk_buf
.in_file
>= 0)
774 rb
->close(disk_buf
.in_file
);
775 disk_buf
.in_file
= -1;
777 /* Invalidate entire cache */
778 rb
->memset(disk_buf
.cache
, 0xff,
779 disk_buf
.pgcount
*sizeof (*disk_buf
.cache
));
780 disk_buf
.file_pages
= 0;
781 disk_buf
.filesize
= 0;
788 int disk_buf_open(const char *filename
)
796 fd
= rb
->open(filename
, O_RDONLY
);
800 ssize_t filesize
= rb
->filesize(fd
);
804 rb
->close(disk_buf
.in_file
);
808 disk_buf
.filesize
= filesize
;
809 /* Number of file pages rounded up toward +inf */
810 disk_buf
.file_pages
= ((size_t)filesize
+ DISK_BUF_PAGE_SIZE
-1)
811 / DISK_BUF_PAGE_SIZE
;
812 disk_buf
.in_file
= fd
;
821 intptr_t disk_buf_send_msg(long id
, intptr_t data
)
823 return rb
->queue_send(disk_buf
.q
, id
, data
);
826 void disk_buf_post_msg(long id
, intptr_t data
)
828 rb
->queue_post(disk_buf
.q
, id
, data
);
831 void disk_buf_reply_msg(intptr_t retval
)
833 rb
->queue_reply(disk_buf
.q
, retval
);
836 bool disk_buf_init(void)
838 disk_buf
.thread
= NULL
;
839 list_initialize(&nf_list
);
841 rb
->mutex_init(&disk_buf_mtx
);
843 disk_buf
.q
= &disk_buf_queue
;
844 rb
->queue_init(disk_buf
.q
, false);
846 disk_buf
.state
= TSTATE_EOS
;
847 disk_buf
.status
= STREAM_STOPPED
;
849 disk_buf
.in_file
= -1;
850 disk_buf
.filesize
= 0;
851 disk_buf
.win_left
= 0;
852 disk_buf
.win_right
= 0;
853 disk_buf
.time_last
= 0;
854 disk_buf
.pos_last
= 0;
855 disk_buf
.low_wm
= DISK_BUF_LOW_WATERMARK
;
857 disk_buf
.start
= mpeg_malloc_all(&disk_buf
.size
, MPEG_ALLOC_DISKBUF
);
858 if (disk_buf
.start
== NULL
)
861 #ifdef PROC_NEEDS_CACHEALIGN
862 disk_buf
.size
= CACHEALIGN_BUFFER(&disk_buf
.start
, disk_buf
.size
);
863 disk_buf
.start
= UNCACHED_ADDR(disk_buf
.start
);
865 disk_buf
.size
-= DISK_GUARDBUF_SIZE
;
866 disk_buf
.pgcount
= disk_buf
.size
/ DISK_BUF_PAGE_SIZE
;
868 /* Fit it as tightly as possible */
869 while (disk_buf
.pgcount
*(sizeof (*disk_buf
.cache
) + DISK_BUF_PAGE_SIZE
)
870 > (size_t)disk_buf
.size
)
875 disk_buf
.cache
= (typeof (disk_buf
.cache
))disk_buf
.start
;
876 disk_buf
.start
+= sizeof (*disk_buf
.cache
)*disk_buf
.pgcount
;
877 disk_buf
.size
= disk_buf
.pgcount
*DISK_BUF_PAGE_SIZE
;
878 disk_buf
.end
= disk_buf
.start
+ disk_buf
.size
;
879 disk_buf
.tail
= disk_buf
.start
;
881 DEBUGF("disk_buf info:\n"
884 disk_buf
.pgcount
, disk_buf
.size
);
886 rb
->memset(disk_buf
.cache
, 0xff,
887 disk_buf
.pgcount
*sizeof (*disk_buf
.cache
));
889 disk_buf
.thread
= rb
->create_thread(
890 disk_buf_thread
, disk_buf_stack
, sizeof(disk_buf_stack
), 0,
891 "mpgbuffer" IF_PRIO(, PRIORITY_BUFFERING
) IF_COP(, CPU
));
893 rb
->queue_enable_queue_send(disk_buf
.q
, &disk_buf_queue_send
,
896 if (disk_buf
.thread
== NULL
)
899 /* Wait for thread to initialize */
900 disk_buf_send_msg(STREAM_NULL
, 0);
905 void disk_buf_exit(void)
907 if (disk_buf
.thread
!= NULL
)
909 rb
->queue_post(disk_buf
.q
, STREAM_QUIT
, 0);
910 rb
->thread_wait(disk_buf
.thread
);
911 disk_buf
.thread
= NULL
;