From 6f6415a7fee0b683fe098493e18bde613ac96d0e Mon Sep 17 00:00:00 2001 From: nico Date: Sat, 7 Jul 2007 16:11:24 +0000 Subject: [PATCH] Beginnings of threading. There is a playback thread and the plugin main function will emulate the buffering thread. The buf*() API calls post messages to the buffering thread, that will treat them in sequence. The code compiles with a lot of warnings and still needs a lot of work. It probably wouldn't work at all but I haven't tested it. git-svn-id: svn://jdgordon.mine.nu/mob@38 9862a28c-4e93-4879-ac26-10afcf840a8f --- testplugin.c | 383 ++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 284 insertions(+), 99 deletions(-) diff --git a/testplugin.c b/testplugin.c index c018dd1..da1956f 100644 --- a/testplugin.c +++ b/testplugin.c @@ -60,6 +60,43 @@ struct memory_handle { struct memory_handle *next; }; +enum { + Q_BUFOPEN, + Q_BUFCLOSE, + Q_BUFSEEK, + Q_BUFADVANCE, + Q_BUFREAD, + Q_BUFGETDATA +}; + +struct bufopen_data { + char *file; + size_t offset; +}; + +struct bufseek_data { + int handle_id; + size_t offset; +}; + +struct bufadvance_data { + int handle_id; + ssize_t offset; +}; + +struct bufread_data { + int handle_id; + size_t size; + char *dest; +}; + +struct bufgetdata_data { + int handle_id; + size_t size; + unsigned char **data; +}; + + static char *buffer; static char *guard_buffer; @@ -77,6 +114,14 @@ static int num_handles; static void graph_view(int width); +/* exported API functions (will be present in the header file) */ +int bufopen(char *file, size_t offset); +int bufclose(int handle_id); +int bufseek(int handle_id, size_t offset); +int bufadvance(int handle_id, ssize_t offset); +int bufread(int handle_id, size_t size, char *dest); +long bufgetdata(int handle_id, size_t size, unsigned char **data); + /* add a new handle to the linked list and return it. It will have become the new current handle */ @@ -122,6 +167,7 @@ static bool rm_handle(struct memory_handle *h) first_handle = h->next; if (h == cur_handle) { DEBUGF("removing the first and last handle\n"); + /* h was the first and last handle */ cur_handle = NULL; buf_ridx = buf_widx; } else { @@ -188,7 +234,7 @@ static struct memory_handle *move_handle(size_t newpos, struct memory_handle *h) /* make sure newpos is 32-bit aligned so that the handle struct is. */ newpos = (RINGBUF_ADD(newpos, 3)) & ~3; - DEBUGF("move_handle"); + DEBUGF("move_handle\n"); struct memory_handle *dest = (struct memory_handle *)(buffer + newpos); /* Invalidate the cache to prevent it from keeping the old location of h */ @@ -328,12 +374,8 @@ static void free_buffer(int handle_id) RETURNS: <0 if the file cannot be opened, or one file already queued to be opened, otherwise the handle for the file in the buffer */ -int bufopen(char *file, size_t offset) +static int buffering_bufopen(char *file, size_t offset) { - /* add the file to the buffering queue. */ - /* for now, we'll assume the queue is always empty, so the handle - gets added immediately */ - DEBUGF("bufopen: %s (offset: %d)\n", file, offset); int fd = rb->open(file, O_RDONLY); @@ -361,7 +403,7 @@ int bufopen(char *file, size_t offset) } /* Close the handle. Return 0 for success and < 0 for failure */ -int bufclose(int handle_id) +static int buffering_bufclose(int handle_id) { DEBUGF("bufclose(%d)\n", handle_id); struct memory_handle *h = find_handle(handle_id); @@ -374,7 +416,7 @@ int bufclose(int handle_id) /* Set the reading index in a handle (relatively to the start of the handle data). Return 0 for success and < 0 for failure */ -int bufseek(int handle_id, size_t offset) +static int buffering_bufseek(int handle_id, size_t offset) { struct memory_handle *h = find_handle(handle_id); if (!h) @@ -389,7 +431,7 @@ int bufseek(int handle_id, size_t offset) /* Advance the reading index in a handle (relatively to its current position). Return 0 for success and < 0 for failure */ -int bufadvance(int handle_id, ssize_t offset) +static int buffering_bufadvance(int handle_id, ssize_t offset) { struct memory_handle *h = find_handle(handle_id); if (!h) @@ -415,7 +457,7 @@ int bufadvance(int handle_id, ssize_t offset) /* Copy data from the given handle to the dest buffer. Return the number of bytes copied or -1 for failure. */ -int bufread(int handle_id, size_t size, char *dest) +static int buffering_bufread(int handle_id, size_t size, char *dest) { struct memory_handle *h = find_handle(handle_id); size_t buffered_data; @@ -440,7 +482,7 @@ int bufread(int handle_id, size_t size, char *dest) /* Update the "data" pointer to make the handle's data available to the caller. Return the length of the available linear data or -1 for failure. */ -long bufgetdata(int handle_id, size_t size, unsigned char **data) +static long buffering_bufgetdata(int handle_id, size_t size, unsigned char **data) { struct memory_handle *h = find_handle(handle_id); if (!h) @@ -663,7 +705,81 @@ bool disk_is_spinning(void) return true; } + + +static long playback_stack[DEFAULT_STACK_SIZE/sizeof(long)]; +static struct thread_entry* playbackthread_id; + +/* Events */ +static struct event_queue buffering_queue; +static struct queue_sender_list buffering_queue_sender_list; + +bool done_buffering = false; +bool done_playing = false; + + #define MAX_HANDLES 64 +static int handle_order[MAX_HANDLES]; +static int last_handle = -1; + +void playback_thread(void) +{ + int reading_handle = 0; + int fd = -1; /* used to write the files out as they are read */ + unsigned char *data; + + /* "Playback thread" section */ + while (1) + { + DEBUGF("reading handle: %d\n", handle_order[reading_handle]); + long read; + long total = 0; + char file[MAX_PATH]; + if (reading_handle >= last_handle + && !handle_has_data(handle_order[reading_handle])) + done_playing = true; + + if (fd < 0) + { + rb->snprintf(file, MAX_PATH, "/file%d.mp3", reading_handle); + fd = rb->open(file, O_CREAT|O_TRUNC|O_WRONLY); + if (fd < 0) + { + DEBUGF("ERROROROROR\n"); + rb->splash(HZ, "couldn't create file"); + } + } + do + { + //read = bufread(handle_order[reading_handle], GUARD_SIZE,read_buffer); + //write(fd, read_buffer, read); + read = bufgetdata(handle_order[reading_handle], GUARD_SIZE, &data); + read = MIN(read, GUARD_SIZE); + rb->write(fd, data, read); + total += read; + bufadvance(handle_order[reading_handle], read); + rb->sleep(HZ/5); + } + while (read > 0); + + DEBUGF("read %ld bytes from handle %d\n", total, handle_order[reading_handle]); + + /* close the fd/handle if there is no more data or an error */ + if (read < 0 || handle_has_data(handle_order[reading_handle]) == false) + { + DEBUGF("finished reading %d\n",handle_order[reading_handle]); + bufclose(handle_order[reading_handle]); + rb->close(fd); + reading_handle++; + fd = -1; + } + else + { + DEBUGF("there is data left to buffer for %d\n", handle_order[reading_handle]); + } + } +} + /* this is the plugin entry point */ enum plugin_status plugin_start(struct plugin_api* api, void* parameter) @@ -680,15 +796,11 @@ enum plugin_status plugin_start(struct plugin_api* api, void* parameter) "/Antena - Camino del Sol (Joakim Remix).mp3" }; int next_file = 1; - int last_handle = -1; - int handle_order[MAX_HANDLES]; - int reading_handle = 0; - bool done_buffering = false, done_playing = false; - //char read_buffer[GUARD_SIZE]; - unsigned char *data; - int fd = -1; /* used to write the files out as they are read */ int current_handle = -1; struct memory_handle *m = NULL; + + struct event ev; + buffer_init(); if (!test_ll()) @@ -698,114 +810,187 @@ enum plugin_status plugin_start(struct plugin_api* api, void* parameter) return PLUGIN_ERROR; } - buffer_init(); - - DEBUGF("test\n"); + /* Msg queue init - no need for queue_remove since it's not a registered + queue */ + rb->queue_init( &buffering_queue, false ); - while (!done_buffering || !done_playing) + playbackthread_id = rb->create_thread(playback_thread, + playback_stack, + sizeof(playback_stack), + "playback" + IF_PRIO(,PRIORITY_PLAYBACK) + IF_COP(, CPU, false)); + + if (!playbackthread_id) + { + rb->splash(HZ, "failed to create playback thread"); + return PLUGIN_ERROR; + } + else { - DEBUGF("buffer usage: %d handles_used: %d\n", BUF_USED, num_handles); -#ifndef ROCKBOX_HAS_LOGF - graph_view(100); -#endif - /* "Buffering thread" section */ - if (!done_buffering && need_rebuffer() && disk_is_spinning()) + while (!done_buffering || !done_playing) { - m = find_handle(current_handle); - if ( !m || ((m->filerem == 0) && (m->next == NULL))) + /* + DEBUGF("buffer usage: %d handles_used: %d\n", BUF_USED, num_handles); +#ifndef ROCKBOX_HAS_LOGF + graph_view(100); +#endif*/ + + rb->queue_wait_w_tmo(&buffering_queue, &ev, HZ/2); + + switch (ev.id) { - int h = bufopen(argv[next_file], 0); - m = find_handle(h); - if (h >= 0) + case Q_BUFOPEN: { - next_file++; - //DEBUGF("new handle %d\n",h); - last_handle++; - handle_order[last_handle] = h; - buffer_handle(m->id); + struct bufopen_data *data; + data = (struct bufopen *)ev.data; + buffering_bufopen(data->file, data->offset); + break; } - else + + case Q_BUFCLOSE: { - DEBUGF("error\n"); + buffering_bufclose((int)ev.data); + break; } - current_handle = h; - } - else - { - if (m->filerem == 0) + + case Q_BUFSEEK: { - m = m->next; - current_handle = m?m->id:-1; + struct bufseek_data *data; + data = (struct bufseek_data *)ev.data; + buffering_bufseek(data->handle_id, data->offset); + break; } - if (m) + + case Q_BUFADVANCE: { - DEBUGF("buffering handle %d\n",m->id); - buffer_handle(m->id); + struct bufadvance_data *data; + data = (struct bufadvance_data *)ev.data; + buffering_bufadvance(data->handle_id, data->offset); + break; } - } - if (next_file == argc && m->filerem == 0) - done_buffering = true; - } - /* "Playback thread" section */ - else - { - DEBUGF("reading handle: %d\n", handle_order[reading_handle]); - long read; - long total = 0; - char file[MAX_PATH]; - if (reading_handle >= last_handle - && !handle_has_data(handle_order[reading_handle])) - done_playing = true; + case Q_BUFREAD: + { + struct bufread_data *data; + data = (struct bufread_data *)ev.data; + buffering_bufread(data->handle_id, data->size, data->dest); + break; + } - if (fd < 0) - { - rb->snprintf(file, MAX_PATH, "/file%d.mp3", reading_handle); - fd = rb->open(file, O_CREAT|O_TRUNC|O_WRONLY); - if (fd < 0) + case Q_BUFGETDATA: { - DEBUGF("ERROROROROR\n"); - rb->splash(HZ, "couldn't create file"); - return PLUGIN_ERROR; + struct bufgetdata_data *data; + data = (struct bufgetdata_data *)ev.data; + buffering_bufgetdata(data->handle_id, data->size, data->data); + break; } } - do - { - //read = bufread(handle_order[reading_handle], GUARD_SIZE,read_buffer); - //write(fd, read_buffer, read); - read = bufgetdata(handle_order[reading_handle], GUARD_SIZE, &data); - read = MIN(read, GUARD_SIZE); - rb->write(fd, data, read); - total += read; - bufadvance(handle_order[reading_handle], read); - } - while (read > 0); - DEBUGF("read %ld bytes from handle %d\n", total, handle_order[reading_handle]); - - /* close the fd/handle if there is no more data or an error */ - if (read < 0 || handle_has_data(handle_order[reading_handle]) == false) + /* "Buffering thread" section */ + if (!done_buffering && need_rebuffer() && disk_is_spinning()) { - DEBUGF("finished reading %d\n",handle_order[reading_handle]); - bufclose(handle_order[reading_handle]); - rb->close(fd); - reading_handle++; - fd = -1; + m = find_handle(current_handle); + if ( !m || ((m->filerem == 0) && (m->next == NULL))) + { + int h = bufopen(argv[next_file], 0); + m = find_handle(h); + if (h >= 0) + { + next_file++; + //DEBUGF("new handle %d\n",h); + last_handle++; + handle_order[last_handle] = h; + buffer_handle(m->id); + } + else + { + DEBUGF("error\n"); + } + current_handle = h; + } + else + { + if (m->filerem == 0) + { + m = m->next; + current_handle = m?m->id:-1; + } + if (m) + { + DEBUGF("buffering handle %d\n",m->id); + buffer_handle(m->id); + } + } + + if (next_file == argc && m->filerem == 0) + done_buffering = true; } else { - DEBUGF("there is data left to buffer for %d\n", handle_order[reading_handle]); + rb->sleep(HZ/10); } } +/* + DEBUGF("buffer usage: %d handles_used: %d\n", BUF_USED,num_handles); +#ifndef ROCKBOX_HAS_LOGF + graph_view(100); +#endif*/ } - DEBUGF("buffer usage: %d handles_used: %d\n", BUF_USED,num_handles); -#ifndef ROCKBOX_HAS_LOGF - graph_view(100); -#endif + return PLUGIN_OK; +} - return 0; - return PLUGIN_OK; +int bufopen(char *file, size_t offset) +{ + static struct bufopen_data data; + data.file = file; + data.offset = offset; + + rb->queue_post(&buffering_queue, Q_BUFOPEN, &data); +} + +int bufclose(int handle_id) +{ + rb->queue_post(&buffering_queue, Q_BUFCLOSE, handle_id); +} + +int bufseek(int handle_id, size_t offset) +{ + static struct bufseek_data data; + data.handle_id = handle_id; + data.offset = offset; + + rb->queue_post(&buffering_queue, Q_BUFSEEK, &data); +} + +int bufadvance(int handle_id, ssize_t offset) +{ + static struct bufadvance_data data; + data.handle_id = handle_id; + data.offset = offset; + + rb->queue_post(&buffering_queue, Q_BUFADVANCE, &data); +} + +int bufread(int handle_id, size_t size, char *dest) +{ + static struct bufread_data data; + data.handle_id = handle_id; + data.size = size; + data.dest = dest; + + rb->queue_post(&buffering_queue, Q_BUFREAD, &data); +} + +long bufgetdata(int handle_id, size_t size, unsigned char **data) +{ + static struct bufgetdata_data argdata; + argdata.handle_id = handle_id; + argdata.size = size; + argdata.data = data; + + rb->queue_post(&buffering_queue, Q_BUFGETDATA, &argdata); } -- 2.11.4.GIT