ntdll: Use a separate memory allocation for the kernel stack.
[wine.git] / dlls / winegstreamer / wg_parser.c
blobdfb3da9a4ab26565820042ddd2a315d2bcc378a0
1 /*
2 * GStreamer parser backend
4 * Copyright 2010 Maarten Lankhorst for CodeWeavers
5 * Copyright 2010 Aric Stewart for CodeWeavers
6 * Copyright 2019-2020 Zebediah Figura
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
23 #if 0
24 #pragma makedep unix
25 #endif
27 #include "config.h"
29 #include <assert.h>
30 #include <stdarg.h>
31 #include <stdio.h>
33 #include <gst/gst.h>
34 #include <gst/video/video.h>
35 #include <gst/audio/audio.h>
36 #include <gst/tag/tag.h>
38 #include "ntstatus.h"
39 #define WIN32_NO_STATUS
40 #include "winternl.h"
41 #include "dshow.h"
43 #include "unix_private.h"
45 typedef enum
47 GST_AUTOPLUG_SELECT_TRY,
48 GST_AUTOPLUG_SELECT_EXPOSE,
49 GST_AUTOPLUG_SELECT_SKIP,
50 } GstAutoplugSelectResult;
52 typedef BOOL (*init_gst_cb)(struct wg_parser *parser);
54 struct input_cache_chunk
56 guint64 position;
57 uint8_t *data;
60 struct wg_parser
62 init_gst_cb init_gst;
64 struct wg_parser_stream **streams;
65 unsigned int stream_count;
67 GstElement *container, *decodebin;
68 GstBus *bus;
69 GstPad *my_src, *their_sink;
71 guint64 file_size, start_offset, next_offset, stop_offset;
72 guint64 next_pull_offset;
74 pthread_t push_thread;
76 pthread_mutex_t mutex;
78 pthread_cond_t init_cond;
79 bool no_more_pads, has_duration, error;
80 bool err_on, warn_on;
82 pthread_cond_t read_cond, read_done_cond;
83 struct
85 GstBuffer *buffer;
86 uint64_t offset;
87 uint32_t size;
88 bool done;
89 GstFlowReturn ret;
90 } read_request;
92 bool sink_connected;
94 bool unlimited_buffering;
96 gchar *sink_caps;
98 struct input_cache_chunk input_cache_chunks[4];
100 static const unsigned int input_cache_chunk_size = 512 << 10;
102 struct wg_parser_stream
104 struct wg_parser *parser;
105 uint32_t number;
107 GstPad *their_src, *my_sink;
108 GstElement *flip, *decodebin;
109 GstSegment segment;
110 struct wg_format preferred_format, current_format, stream_format;
112 pthread_cond_t event_cond, event_empty_cond;
113 GstBuffer *buffer;
114 GstMapInfo map_info;
116 bool flushing, eos, enabled, has_caps, has_tags, has_buffer, no_more_pads;
118 uint64_t duration;
119 gchar *tags[WG_PARSER_TAG_COUNT];
122 static NTSTATUS wg_parser_get_stream_count(void *args)
124 struct wg_parser_get_stream_count_params *params = args;
126 params->count = params->parser->stream_count;
127 return S_OK;
130 static NTSTATUS wg_parser_get_stream(void *args)
132 struct wg_parser_get_stream_params *params = args;
134 params->stream = params->parser->streams[params->index];
135 return S_OK;
138 static NTSTATUS wg_parser_get_next_read_offset(void *args)
140 struct wg_parser_get_next_read_offset_params *params = args;
141 struct wg_parser *parser = params->parser;
143 pthread_mutex_lock(&parser->mutex);
145 while (parser->sink_connected && !parser->read_request.size)
146 pthread_cond_wait(&parser->read_cond, &parser->mutex);
148 if (!parser->sink_connected)
150 pthread_mutex_unlock(&parser->mutex);
151 return VFW_E_WRONG_STATE;
154 params->offset = parser->read_request.offset;
155 params->size = parser->read_request.size;
157 pthread_mutex_unlock(&parser->mutex);
158 return S_OK;
161 static NTSTATUS wg_parser_push_data(void *args)
163 const struct wg_parser_push_data_params *params = args;
164 struct wg_parser *parser = params->parser;
165 const void *data = params->data;
166 uint32_t size = params->size;
168 pthread_mutex_lock(&parser->mutex);
170 if (data)
172 if (size)
174 GstMapInfo map_info;
176 /* Note that we don't allocate the buffer until we have a size.
177 * midiparse passes a NULL buffer and a size of UINT_MAX, in an
178 * apparent attempt to read the whole input stream at once. */
179 if (!parser->read_request.buffer)
180 parser->read_request.buffer = gst_buffer_new_and_alloc(size);
181 gst_buffer_map(parser->read_request.buffer, &map_info, GST_MAP_WRITE);
182 memcpy(map_info.data, data, size);
183 gst_buffer_unmap(parser->read_request.buffer, &map_info);
184 parser->read_request.ret = GST_FLOW_OK;
186 else
188 parser->read_request.ret = GST_FLOW_EOS;
191 else
193 parser->read_request.ret = GST_FLOW_ERROR;
195 parser->read_request.done = true;
196 parser->read_request.size = 0;
198 pthread_mutex_unlock(&parser->mutex);
199 pthread_cond_signal(&parser->read_done_cond);
201 return S_OK;
204 static NTSTATUS wg_parser_stream_get_preferred_format(void *args)
206 const struct wg_parser_stream_get_preferred_format_params *params = args;
208 *params->format = params->stream->preferred_format;
209 return S_OK;
212 static NTSTATUS wg_parser_stream_enable(void *args)
214 const struct wg_parser_stream_enable_params *params = args;
215 struct wg_parser_stream *stream = params->stream;
216 const struct wg_format *format = params->format;
217 struct wg_parser *parser = stream->parser;
219 pthread_mutex_lock(&parser->mutex);
221 stream->current_format = *format;
222 stream->enabled = true;
224 pthread_mutex_unlock(&parser->mutex);
226 if (format->major_type == WG_MAJOR_TYPE_VIDEO)
228 bool flip = (format->u.video.height < 0);
230 gst_util_set_object_arg(G_OBJECT(stream->flip), "method", flip ? "vertical-flip" : "none");
233 gst_pad_push_event(stream->my_sink, gst_event_new_reconfigure());
234 return S_OK;
237 static NTSTATUS wg_parser_stream_disable(void *args)
239 struct wg_parser_stream *stream = args;
240 struct wg_parser *parser = stream->parser;
242 pthread_mutex_lock(&parser->mutex);
243 stream->enabled = false;
244 stream->current_format.major_type = WG_MAJOR_TYPE_UNKNOWN;
245 pthread_mutex_unlock(&parser->mutex);
246 pthread_cond_signal(&stream->event_empty_cond);
247 return S_OK;
250 static GstBuffer *wait_parser_stream_buffer(struct wg_parser *parser, struct wg_parser_stream *stream)
252 GstBuffer *buffer = NULL;
254 /* Note that we can both have a buffer and stream->eos, in which case we
255 * must return the buffer. */
257 while (stream->enabled && !(buffer = stream->buffer) && !stream->eos)
258 pthread_cond_wait(&stream->event_cond, &parser->mutex);
260 return buffer;
263 static NTSTATUS wg_parser_stream_get_buffer(void *args)
265 const struct wg_parser_stream_get_buffer_params *params = args;
266 struct wg_parser_buffer *wg_buffer = params->buffer;
267 struct wg_parser_stream *stream = params->stream;
268 struct wg_parser *parser = params->parser;
269 GstBuffer *buffer;
270 unsigned int i;
272 pthread_mutex_lock(&parser->mutex);
274 if (stream)
275 buffer = wait_parser_stream_buffer(parser, stream);
276 else
278 /* Find the earliest buffer by PTS.
280 * Native seems to behave similarly to this with the wm async reader, although our
281 * unit tests show that it's not entirely consistent—some frames are received
282 * slightly out of order. It's possible that one stream is being manually offset
283 * to account for decoding latency.
285 * The behaviour with the wm sync reader, when stream 0 is requested, seems
286 * consistent with this hypothesis, but with a much larger offset—the video
287 * stream seems to be "behind" by about 150 ms.
289 * The main reason for doing this is that the video and audio stream probably
290 * don't have quite the same "frame rate", and we don't want to force one stream
291 * to decode faster just to keep up with the other. Delivering samples in PTS
292 * order should avoid that problem. */
293 GstBuffer *earliest = NULL;
295 for (i = 0; i < parser->stream_count; ++i)
297 if (!(buffer = wait_parser_stream_buffer(parser, parser->streams[i])))
298 continue;
299 /* invalid PTS is GST_CLOCK_TIME_NONE == (guint64)-1, so this will prefer valid timestamps. */
300 if (!earliest || GST_BUFFER_PTS(buffer) < GST_BUFFER_PTS(earliest))
302 stream = parser->streams[i];
303 earliest = buffer;
307 buffer = earliest;
310 if (!buffer)
312 pthread_mutex_unlock(&parser->mutex);
313 return S_FALSE;
316 /* FIXME: Should we use gst_segment_to_stream_time_full()? Under what
317 * circumstances is the stream time not equal to the buffer PTS? Note
318 * that this will need modification to wg_parser_stream_notify_qos() as
319 * well. */
321 if ((wg_buffer->has_pts = GST_BUFFER_PTS_IS_VALID(buffer)))
322 wg_buffer->pts = GST_BUFFER_PTS(buffer) / 100;
323 if ((wg_buffer->has_duration = GST_BUFFER_DURATION_IS_VALID(buffer)))
324 wg_buffer->duration = GST_BUFFER_DURATION(buffer) / 100;
325 wg_buffer->discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT);
326 wg_buffer->preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE);
327 wg_buffer->delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
328 wg_buffer->size = gst_buffer_get_size(buffer);
329 wg_buffer->stream = stream->number;
331 pthread_mutex_unlock(&parser->mutex);
332 return S_OK;
335 static NTSTATUS wg_parser_stream_copy_buffer(void *args)
337 const struct wg_parser_stream_copy_buffer_params *params = args;
338 struct wg_parser_stream *stream = params->stream;
339 struct wg_parser *parser = stream->parser;
340 uint32_t offset = params->offset;
341 uint32_t size = params->size;
343 pthread_mutex_lock(&parser->mutex);
345 if (!stream->buffer)
347 pthread_mutex_unlock(&parser->mutex);
348 return VFW_E_WRONG_STATE;
351 assert(offset < stream->map_info.size);
352 assert(offset + size <= stream->map_info.size);
353 memcpy(params->data, stream->map_info.data + offset, size);
355 pthread_mutex_unlock(&parser->mutex);
356 return S_OK;
359 static NTSTATUS wg_parser_stream_release_buffer(void *args)
361 struct wg_parser_stream *stream = args;
362 struct wg_parser *parser = stream->parser;
364 pthread_mutex_lock(&parser->mutex);
366 assert(stream->buffer);
368 gst_buffer_unmap(stream->buffer, &stream->map_info);
369 gst_buffer_unref(stream->buffer);
370 stream->buffer = NULL;
372 pthread_mutex_unlock(&parser->mutex);
373 pthread_cond_signal(&stream->event_empty_cond);
375 return S_OK;
378 static NTSTATUS wg_parser_stream_get_duration(void *args)
380 struct wg_parser_stream_get_duration_params *params = args;
382 params->duration = params->stream->duration;
383 return S_OK;
386 static NTSTATUS wg_parser_stream_get_tag(void *args)
388 struct wg_parser_stream_get_tag_params *params = args;
389 uint32_t len;
391 if (params->tag >= WG_PARSER_TAG_COUNT)
392 return STATUS_INVALID_PARAMETER;
393 if (!params->stream->tags[params->tag])
394 return STATUS_NOT_FOUND;
395 if ((len = strlen(params->stream->tags[params->tag]) + 1) > *params->size)
397 *params->size = len;
398 return STATUS_BUFFER_TOO_SMALL;
400 memcpy(params->buffer, params->stream->tags[params->tag], len);
401 return STATUS_SUCCESS;
404 static NTSTATUS wg_parser_stream_seek(void *args)
406 GstSeekType start_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET;
407 const struct wg_parser_stream_seek_params *params = args;
408 DWORD start_flags = params->start_flags;
409 DWORD stop_flags = params->stop_flags;
410 GstSeekFlags flags = 0;
412 if (start_flags & AM_SEEKING_SeekToKeyFrame)
413 flags |= GST_SEEK_FLAG_KEY_UNIT;
414 if (start_flags & AM_SEEKING_Segment)
415 flags |= GST_SEEK_FLAG_SEGMENT;
416 if (!(start_flags & AM_SEEKING_NoFlush))
417 flags |= GST_SEEK_FLAG_FLUSH;
419 if ((start_flags & AM_SEEKING_PositioningBitsMask) == AM_SEEKING_NoPositioning)
420 start_type = GST_SEEK_TYPE_NONE;
421 if ((stop_flags & AM_SEEKING_PositioningBitsMask) == AM_SEEKING_NoPositioning)
422 stop_type = GST_SEEK_TYPE_NONE;
424 if (!gst_pad_push_event(params->stream->my_sink, gst_event_new_seek(params->rate, GST_FORMAT_TIME,
425 flags, start_type, params->start_pos * 100, stop_type, params->stop_pos * 100)))
426 GST_ERROR("Failed to seek.\n");
428 return S_OK;
431 static NTSTATUS wg_parser_stream_notify_qos(void *args)
433 const struct wg_parser_stream_notify_qos_params *params = args;
434 struct wg_parser_stream *stream = params->stream;
435 GstClockTime stream_time;
436 GstEvent *event;
438 /* We return timestamps in stream time, i.e. relative to the start of the
439 * file (or other medium), but gst_event_new_qos() expects the timestamp in
440 * running time. */
441 stream_time = gst_segment_to_running_time(&stream->segment, GST_FORMAT_TIME, params->timestamp * 100);
442 if (stream_time == -1)
444 /* This can happen legitimately if the sample falls outside of the
445 * segment bounds. GStreamer elements shouldn't present the sample in
446 * that case, but DirectShow doesn't care. */
447 GST_LOG("Ignoring QoS event.\n");
448 return S_OK;
451 if (!(event = gst_event_new_qos(params->underflow ? GST_QOS_TYPE_UNDERFLOW : GST_QOS_TYPE_OVERFLOW,
452 params->proportion, params->diff * 100, stream_time)))
453 GST_ERROR("Failed to create QOS event.\n");
454 gst_pad_push_event(stream->my_sink, event);
456 return S_OK;
459 static bool parser_no_more_pads(struct wg_parser *parser)
461 unsigned int i;
463 for (i = 0; i < parser->stream_count; ++i)
465 if (!parser->streams[i]->no_more_pads)
466 return false;
469 return parser->no_more_pads;
472 static gboolean autoplug_continue_cb(GstElement * decodebin, GstPad *pad, GstCaps * caps, gpointer user)
474 struct wg_format format;
476 wg_format_from_caps(&format, caps);
478 if (format.major_type != WG_MAJOR_TYPE_UNKNOWN
479 && format.major_type != WG_MAJOR_TYPE_VIDEO
480 && format.major_type != WG_MAJOR_TYPE_AUDIO)
481 return false;
483 return true;
486 static GstAutoplugSelectResult autoplug_select_cb(GstElement *bin, GstPad *pad,
487 GstCaps *caps, GstElementFactory *fact, gpointer user)
489 struct wg_parser *parser = user;
490 const char *name = gst_element_factory_get_longname(fact);
491 const char *klass = gst_element_factory_get_klass(fact);
493 GST_INFO("Using \"%s\".", name);
495 if (strstr(name, "Player protection"))
497 GST_WARNING("Blacklisted a/52 decoder because it only works in Totem.");
498 return GST_AUTOPLUG_SELECT_SKIP;
500 if (!strcmp(name, "Fluendo Hardware Accelerated Video Decoder"))
502 GST_WARNING("Disabled video acceleration since it breaks in wine.");
503 return GST_AUTOPLUG_SELECT_SKIP;
506 if (!parser->sink_caps && strstr(klass, GST_ELEMENT_FACTORY_KLASS_DEMUXER))
507 parser->sink_caps = g_strdup(gst_structure_get_name(gst_caps_get_structure(caps, 0)));
509 return GST_AUTOPLUG_SELECT_TRY;
512 static void no_more_pads_cb(GstElement *element, gpointer user)
514 struct wg_parser *parser = user;
516 GST_DEBUG("parser %p.", parser);
518 pthread_mutex_lock(&parser->mutex);
519 parser->no_more_pads = true;
520 pthread_mutex_unlock(&parser->mutex);
521 pthread_cond_signal(&parser->init_cond);
524 static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
526 struct wg_parser_stream *stream = gst_pad_get_element_private(pad);
527 struct wg_parser *parser = stream->parser;
529 GST_LOG("stream %p, type \"%s\".", stream, GST_EVENT_TYPE_NAME(event));
531 switch (event->type)
533 case GST_EVENT_SEGMENT:
534 pthread_mutex_lock(&parser->mutex);
535 if (stream->enabled)
537 const GstSegment *segment;
539 gst_event_parse_segment(event, &segment);
541 if (segment->format != GST_FORMAT_TIME)
543 pthread_mutex_unlock(&parser->mutex);
544 GST_FIXME("Unhandled format \"%s\".", gst_format_get_name(segment->format));
545 break;
548 gst_segment_copy_into(segment, &stream->segment);
550 pthread_mutex_unlock(&parser->mutex);
551 break;
553 case GST_EVENT_EOS:
554 pthread_mutex_lock(&parser->mutex);
555 stream->eos = true;
556 if (stream->enabled)
557 pthread_cond_signal(&stream->event_cond);
558 else
559 pthread_cond_signal(&parser->init_cond);
560 pthread_mutex_unlock(&parser->mutex);
561 break;
563 case GST_EVENT_FLUSH_START:
564 pthread_mutex_lock(&parser->mutex);
566 if (stream->enabled)
568 stream->flushing = true;
569 pthread_cond_signal(&stream->event_empty_cond);
571 if (stream->buffer)
573 gst_buffer_unmap(stream->buffer, &stream->map_info);
574 gst_buffer_unref(stream->buffer);
575 stream->buffer = NULL;
579 pthread_mutex_unlock(&parser->mutex);
580 break;
582 case GST_EVENT_FLUSH_STOP:
584 gboolean reset_time;
586 gst_event_parse_flush_stop(event, &reset_time);
588 if (reset_time)
589 gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
591 pthread_mutex_lock(&parser->mutex);
593 stream->eos = false;
594 if (stream->enabled)
595 stream->flushing = false;
597 pthread_mutex_unlock(&parser->mutex);
598 break;
601 case GST_EVENT_CAPS:
603 GstCaps *caps;
605 gst_event_parse_caps(event, &caps);
606 pthread_mutex_lock(&parser->mutex);
607 wg_format_from_caps(&stream->preferred_format, caps);
608 stream->has_caps = true;
609 pthread_mutex_unlock(&parser->mutex);
610 pthread_cond_signal(&parser->init_cond);
611 break;
614 case GST_EVENT_TAG:
615 pthread_mutex_lock(&parser->mutex);
616 stream->has_tags = true;
617 pthread_cond_signal(&parser->init_cond);
618 pthread_mutex_unlock(&parser->mutex);
619 break;
621 default:
622 GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
624 gst_event_unref(event);
625 return TRUE;
628 static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer)
630 struct wg_parser_stream *stream = gst_pad_get_element_private(pad);
631 struct wg_parser *parser = stream->parser;
633 GST_LOG("stream %p, buffer %p.", stream, buffer);
635 pthread_mutex_lock(&parser->mutex);
637 if (!stream->has_buffer)
639 stream->has_buffer = true;
640 pthread_cond_signal(&parser->init_cond);
643 /* Allow this buffer to be flushed by GStreamer. We are effectively
644 * implementing a queue object here. */
646 while (stream->enabled && !stream->flushing && stream->buffer)
647 pthread_cond_wait(&stream->event_empty_cond, &parser->mutex);
649 if (!stream->enabled)
651 pthread_mutex_unlock(&parser->mutex);
652 gst_buffer_unref(buffer);
653 return GST_FLOW_OK;
656 if (stream->flushing)
658 pthread_mutex_unlock(&parser->mutex);
659 GST_DEBUG("Stream is flushing; discarding buffer.");
660 gst_buffer_unref(buffer);
661 return GST_FLOW_FLUSHING;
664 if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ))
666 pthread_mutex_unlock(&parser->mutex);
667 GST_ERROR("Failed to map buffer.\n");
668 gst_buffer_unref(buffer);
669 return GST_FLOW_ERROR;
672 stream->buffer = buffer;
674 pthread_mutex_unlock(&parser->mutex);
675 pthread_cond_signal(&stream->event_cond);
677 /* The chain callback is given a reference to the buffer. Transfer that
678 * reference to the stream object, which will release it in
679 * wg_parser_stream_release_buffer(). */
681 GST_LOG("Buffer queued.");
682 return GST_FLOW_OK;
685 static gboolean sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
687 struct wg_parser_stream *stream = gst_pad_get_element_private(pad);
688 struct wg_parser *parser = stream->parser;
690 GST_LOG("stream %p, type \"%s\".", stream, gst_query_type_get_name(query->type));
692 switch (query->type)
694 case GST_QUERY_CAPS:
696 GstCaps *caps, *filter, *temp;
697 gchar *str;
698 gsize i;
700 gst_query_parse_caps(query, &filter);
702 pthread_mutex_lock(&parser->mutex);
703 caps = wg_format_to_caps(&stream->current_format);
704 pthread_mutex_unlock(&parser->mutex);
706 if (!caps)
707 return FALSE;
709 /* Clear some fields that shouldn't prevent us from connecting. */
710 for (i = 0; i < gst_caps_get_size(caps); ++i)
711 gst_structure_remove_fields(gst_caps_get_structure(caps, i),
712 "framerate", "pixel-aspect-ratio", "colorimetry", "chroma-site", NULL);
714 str = gst_caps_to_string(caps);
715 GST_LOG("Stream caps are \"%s\".", str);
716 g_free(str);
718 if (filter)
720 temp = gst_caps_intersect(caps, filter);
721 gst_caps_unref(caps);
722 caps = temp;
725 gst_query_set_caps_result(query, caps);
726 gst_caps_unref(caps);
727 return TRUE;
730 case GST_QUERY_ACCEPT_CAPS:
732 struct wg_format format;
733 gboolean ret = TRUE;
734 GstCaps *caps;
736 pthread_mutex_lock(&parser->mutex);
738 if (stream->current_format.major_type == WG_MAJOR_TYPE_UNKNOWN)
740 pthread_mutex_unlock(&parser->mutex);
741 gst_query_set_accept_caps_result(query, TRUE);
742 return TRUE;
745 gst_query_parse_accept_caps(query, &caps);
746 wg_format_from_caps(&format, caps);
747 ret = wg_format_compare(&format, &stream->current_format);
749 pthread_mutex_unlock(&parser->mutex);
751 if (!ret && gst_debug_category_get_threshold(GST_CAT_DEFAULT) >= GST_LEVEL_WARNING)
753 gchar *str = gst_caps_to_string(caps);
754 GST_WARNING("Rejecting caps \"%s\".", str);
755 g_free(str);
757 gst_query_set_accept_caps_result(query, ret);
758 return TRUE;
761 default:
762 return gst_pad_query_default (pad, parent, query);
766 static struct wg_parser_stream *create_stream(struct wg_parser *parser)
768 struct wg_parser_stream *stream, **new_array;
769 char pad_name[19];
771 if (!(new_array = realloc(parser->streams, (parser->stream_count + 1) * sizeof(*parser->streams))))
772 return NULL;
773 parser->streams = new_array;
775 if (!(stream = calloc(1, sizeof(*stream))))
776 return NULL;
778 gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
780 stream->parser = parser;
781 stream->number = parser->stream_count;
782 stream->no_more_pads = true;
783 stream->current_format.major_type = WG_MAJOR_TYPE_UNKNOWN;
784 pthread_cond_init(&stream->event_cond, NULL);
785 pthread_cond_init(&stream->event_empty_cond, NULL);
787 sprintf(pad_name, "qz_sink_%u", parser->stream_count);
788 stream->my_sink = gst_pad_new(pad_name, GST_PAD_SINK);
789 gst_pad_set_element_private(stream->my_sink, stream);
790 gst_pad_set_chain_function(stream->my_sink, sink_chain_cb);
791 gst_pad_set_event_function(stream->my_sink, sink_event_cb);
792 gst_pad_set_query_function(stream->my_sink, sink_query_cb);
794 parser->streams[parser->stream_count++] = stream;
795 return stream;
798 static void free_stream(struct wg_parser_stream *stream)
800 unsigned int i;
802 if (stream->their_src)
803 gst_object_unref(stream->their_src);
804 gst_object_unref(stream->my_sink);
806 pthread_cond_destroy(&stream->event_cond);
807 pthread_cond_destroy(&stream->event_empty_cond);
809 for (i = 0; i < ARRAY_SIZE(stream->tags); ++i)
811 if (stream->tags[i])
812 g_free(stream->tags[i]);
814 free(stream);
817 static bool stream_create_post_processing_elements(struct wg_parser_stream *stream)
819 GstElement *element = NULL, *first = NULL, *last = NULL;
820 struct wg_parser *parser = stream->parser;
821 GstPad *pad = stream->their_src;
822 const char *name;
823 GstCaps *caps;
824 int ret;
826 caps = gst_pad_query_caps(pad, NULL);
827 name = gst_structure_get_name(gst_caps_get_structure(caps, 0));
828 gst_caps_unref(caps);
830 if (!strcmp(name, "video/x-raw"))
832 /* DirectShow can express interlaced video, but downstream filters can't
833 * necessarily consume it. In particular, the video renderer can't. */
834 if (!(element = create_element("deinterlace", "good"))
835 || !append_element(parser->container, element, &first, &last))
836 return false;
838 /* decodebin considers many YUV formats to be "raw", but some quartz
839 * filters can't handle those. Also, videoflip can't handle all "raw"
840 * formats either. Add a videoconvert to swap color spaces. */
841 if (!(element = create_element("videoconvert", "base"))
842 || !append_element(parser->container, element, &first, &last))
843 return false;
845 /* GStreamer outputs RGB video top-down, but DirectShow expects bottom-up. */
846 if (!(element = create_element("videoflip", "good"))
847 || !append_element(parser->container, element, &first, &last))
848 return false;
849 stream->flip = element;
851 /* videoflip does not support 15 and 16-bit RGB so add a second videoconvert
852 * to do the final conversion. */
853 if (!(element = create_element("videoconvert", "base"))
854 || !append_element(parser->container, element, &first, &last))
855 return false;
857 if (!link_src_to_element(pad, first) || !link_element_to_sink(last, stream->my_sink))
858 return false;
860 else if (!strcmp(name, "audio/x-raw"))
862 /* Currently our dsound can't handle 64-bit formats or all
863 * surround-sound configurations. Native dsound can't always handle
864 * 64-bit formats either. Add an audioconvert to allow changing bit
865 * depth and channel count. */
866 if (!(element = create_element("audioconvert", "base"))
867 || !append_element(parser->container, element, &first, &last))
868 return false;
870 if (!link_src_to_element(pad, first) || !link_element_to_sink(last, stream->my_sink))
871 return false;
873 else if ((ret = gst_pad_link(pad, stream->my_sink)) < 0)
875 GST_ERROR("Failed to link decodebin source pad to our sink pad, error %s.",
876 gst_pad_link_get_name(ret));
877 return false;
880 return true;
883 static void stream_decodebin_no_more_pads_cb(GstElement *element, gpointer user)
885 struct wg_parser_stream *stream = user;
886 struct wg_parser *parser = stream->parser;
888 GST_DEBUG("stream %p, parser %p, element %p.", stream, parser, element);
890 pthread_mutex_lock(&parser->mutex);
891 stream->no_more_pads = true;
892 pthread_mutex_unlock(&parser->mutex);
893 pthread_cond_signal(&parser->init_cond);
896 static void stream_decodebin_pad_added_cb(GstElement *element, GstPad *pad, gpointer user)
898 struct wg_parser_stream *stream = user;
899 struct wg_parser *parser = stream->parser;
901 GST_LOG("stream %p, parser %p, element %p, pad %p.", stream, parser, element, pad);
903 if (gst_pad_is_linked(pad))
904 return;
906 gst_object_ref(stream->their_src = pad);
907 if (!stream_create_post_processing_elements(stream))
908 return;
909 gst_pad_set_active(stream->my_sink, 1);
912 static bool stream_decodebin_create(struct wg_parser_stream *stream)
914 struct wg_parser *parser = stream->parser;
916 GST_LOG("stream %p, parser %p.", stream, parser);
918 if (!(stream->decodebin = create_element("decodebin", "base")))
919 return false;
920 gst_bin_add(GST_BIN(parser->container), stream->decodebin);
922 g_signal_connect(stream->decodebin, "pad-added", G_CALLBACK(stream_decodebin_pad_added_cb), stream);
923 g_signal_connect(stream->decodebin, "autoplug-select", G_CALLBACK(autoplug_select_cb), stream);
924 g_signal_connect(stream->decodebin, "no-more-pads", G_CALLBACK(stream_decodebin_no_more_pads_cb), stream);
926 pthread_mutex_lock(&parser->mutex);
927 stream->no_more_pads = false;
928 pthread_mutex_unlock(&parser->mutex);
929 gst_element_sync_state_with_parent(stream->decodebin);
931 GST_LOG("Created stream decodebin %p for %u.", stream->decodebin, stream->number);
933 return true;
936 static void pad_added_cb(GstElement *element, GstPad *pad, gpointer user)
938 struct wg_parser_stream *stream;
939 struct wg_parser *parser = user;
940 GstCaps *caps;
942 GST_LOG("parser %p, element %p, pad %p.", parser, element, pad);
944 if (gst_pad_is_linked(pad))
945 return;
947 if (!(stream = create_stream(parser)))
948 return;
950 caps = gst_pad_query_caps(pad, NULL);
951 wg_format_from_caps(&stream->stream_format, caps);
952 gst_caps_unref(caps);
954 /* For compressed stream, create an extra decodebin to decode it. */
955 if (stream->stream_format.major_type != WG_MAJOR_TYPE_UNKNOWN
956 && stream->stream_format.major_type != WG_MAJOR_TYPE_VIDEO
957 && stream->stream_format.major_type != WG_MAJOR_TYPE_AUDIO)
959 if (!stream_decodebin_create(stream))
961 GST_ERROR("Failed to create decodebin for stream %u.", stream->number);
962 return;
965 if (!link_src_to_element(pad, stream->decodebin))
966 GST_ERROR("Failed to link pad %p to stream decodebin %p for stream %u.",
967 pad, stream->decodebin, stream->number);
969 return;
972 gst_object_ref(stream->their_src = pad);
973 if (!stream_create_post_processing_elements(stream))
974 return;
975 gst_pad_set_active(stream->my_sink, 1);
978 static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user)
980 struct wg_parser *parser = user;
981 bool done = false;
982 unsigned int i;
983 char *name;
985 GST_LOG("parser %p, element %p, pad %p.", parser, element, pad);
987 for (i = 0; i < parser->stream_count; ++i)
989 struct wg_parser_stream *stream = parser->streams[i];
990 GstPad *stream_decodebin_sink_peer = NULL;
991 GstPad *stream_decodebin_sink = NULL;
993 if (stream->decodebin)
995 stream_decodebin_sink = gst_element_get_static_pad(stream->decodebin, "sink");
996 stream_decodebin_sink_peer = gst_pad_get_peer(stream_decodebin_sink);
999 if (stream->their_src == pad || stream_decodebin_sink_peer == pad)
1001 gst_object_unref(stream->their_src);
1002 stream->their_src = NULL;
1004 if (stream_decodebin_sink_peer == pad)
1005 gst_pad_unlink(pad, stream_decodebin_sink);
1007 done = true;
1010 if (stream_decodebin_sink_peer)
1011 gst_object_unref(stream_decodebin_sink_peer);
1012 if (stream_decodebin_sink)
1013 gst_object_unref(stream_decodebin_sink);
1015 if (done)
1016 return;
1019 name = gst_pad_get_name(pad);
1020 GST_WARNING("No pin matching pad \"%s\" found.", name);
1021 g_free(name);
1024 static GstFlowReturn issue_read_request(struct wg_parser *parser, guint64 offset, guint size, GstBuffer **buffer)
1026 GstFlowReturn ret;
1028 pthread_mutex_lock(&parser->mutex);
1030 assert(!parser->read_request.size);
1031 parser->read_request.buffer = *buffer;
1032 parser->read_request.offset = offset;
1033 parser->read_request.size = size;
1034 parser->read_request.done = false;
1035 pthread_cond_signal(&parser->read_cond);
1037 /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
1038 * the upstream pin to flush if necessary. We should never be blocked on
1039 * read_thread() not running. */
1041 while (!parser->read_request.done)
1042 pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
1044 *buffer = parser->read_request.buffer;
1045 ret = parser->read_request.ret;
1047 pthread_mutex_unlock(&parser->mutex);
1049 GST_LOG("Request returned %s.", gst_flow_get_name(ret));
1051 return ret;
1054 static struct input_cache_chunk * get_cache_entry(struct wg_parser *parser, guint64 position)
1056 struct input_cache_chunk chunk;
1057 unsigned int i;
1059 for (i = 0; i < ARRAY_SIZE(parser->input_cache_chunks); i++)
1061 chunk = parser->input_cache_chunks[i];
1063 if (chunk.data && position == chunk.position)
1065 if (i != 0)
1067 memmove(&parser->input_cache_chunks[1], &parser->input_cache_chunks[0], i * sizeof(chunk));
1068 parser->input_cache_chunks[0] = chunk;
1071 return &parser->input_cache_chunks[0];
1075 return NULL;
1078 static GstFlowReturn read_cached_chunk(struct wg_parser *parser, guint64 chunk_position, unsigned int chunk_offset, GstBuffer *buffer, guint64 buffer_offset)
1080 struct input_cache_chunk *chunk;
1081 GstBuffer *chunk_buffer;
1082 void *chunk_data;
1083 GstFlowReturn ret;
1085 if ((chunk = get_cache_entry(parser, chunk_position)))
1087 if (!!gst_buffer_fill(buffer, buffer_offset, chunk->data + chunk_offset, input_cache_chunk_size - chunk_offset))
1088 return GST_FLOW_OK;
1089 else
1090 return GST_FLOW_ERROR;
1093 chunk = &parser->input_cache_chunks[ ARRAY_SIZE(parser->input_cache_chunks) - 1 ];
1095 if (!(chunk_data = chunk->data))
1096 chunk_data = malloc(input_cache_chunk_size);
1098 chunk_buffer = gst_buffer_new_wrapped_full(0, chunk_data, input_cache_chunk_size, 0, input_cache_chunk_size, NULL, NULL);
1099 ret = issue_read_request(parser, chunk_position, input_cache_chunk_size, &chunk_buffer);
1100 gst_buffer_unref(chunk_buffer);
1102 if (ret != GST_FLOW_OK)
1104 if (!chunk->data)
1105 free(chunk_data);
1106 return ret;
1109 memmove(&parser->input_cache_chunks[1], &parser->input_cache_chunks[0], (ARRAY_SIZE(parser->input_cache_chunks) - 1) * sizeof(*chunk));
1110 parser->input_cache_chunks[0].data = chunk_data;
1111 parser->input_cache_chunks[0].position = chunk_position;
1113 chunk = &parser->input_cache_chunks[0];
1114 if (!!gst_buffer_fill(buffer, buffer_offset, chunk->data + chunk_offset, input_cache_chunk_size - chunk_offset))
1115 return GST_FLOW_OK;
1116 else
1117 return GST_FLOW_ERROR;
1120 static GstFlowReturn read_input_cache(struct wg_parser *parser, guint64 offset, guint size, GstBuffer **buffer)
1122 unsigned int i, chunk_count, chunk_offset, buffer_offset = 0;
1123 GstBuffer *working_buffer;
1124 guint64 chunk_position;
1125 GstFlowReturn ret;
1127 working_buffer = *buffer;
1128 if (!working_buffer)
1129 working_buffer = gst_buffer_new_and_alloc(size);
1131 chunk_position = offset - (offset % input_cache_chunk_size);
1132 chunk_count = (offset + size + input_cache_chunk_size - chunk_position - 1) / input_cache_chunk_size;
1133 chunk_offset = offset - chunk_position;
1135 for (i = 0; i < chunk_count; i++)
1137 if ((ret = read_cached_chunk(parser, chunk_position, chunk_offset, working_buffer, buffer_offset)) != GST_FLOW_OK)
1139 if (!*buffer)
1140 gst_buffer_unref(working_buffer);
1141 return ret;
1144 chunk_position += input_cache_chunk_size;
1145 buffer_offset += input_cache_chunk_size - chunk_offset;
1146 chunk_offset = 0;
1149 *buffer = working_buffer;
1150 return GST_FLOW_OK;
1153 static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
1154 guint64 offset, guint size, GstBuffer **buffer)
1156 struct wg_parser *parser = gst_pad_get_element_private(pad);
1158 GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, size %u, buffer %p.", pad, offset, size, *buffer);
1160 if (offset == GST_BUFFER_OFFSET_NONE)
1161 offset = parser->next_pull_offset;
1162 parser->next_pull_offset = offset + size;
1164 if (!size)
1166 /* asfreader occasionally asks for zero bytes. gst_buffer_map() will
1167 * return NULL in this case. Avoid confusing the read thread by asking
1168 * it for zero bytes. */
1169 if (!*buffer)
1170 *buffer = gst_buffer_new_and_alloc(0);
1171 gst_buffer_set_size(*buffer, 0);
1172 GST_LOG("Returning empty buffer.");
1173 return GST_FLOW_OK;
1176 if (size >= input_cache_chunk_size || sizeof(void*) == 4)
1177 return issue_read_request(parser, offset, size, buffer);
1179 if (offset >= parser->file_size)
1180 return GST_FLOW_EOS;
1182 if ((offset + size) >= parser->file_size)
1183 size = parser->file_size - offset;
1185 return read_input_cache(parser, offset, size, buffer);
1188 static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
1190 struct wg_parser *parser = gst_pad_get_element_private(pad);
1191 GstFormat format;
1193 GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query));
1195 switch (GST_QUERY_TYPE(query))
1197 case GST_QUERY_DURATION:
1198 gst_query_parse_duration(query, &format, NULL);
1199 if (format == GST_FORMAT_PERCENT)
1201 gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX);
1202 return TRUE;
1204 else if (format == GST_FORMAT_BYTES)
1206 gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size);
1207 return TRUE;
1209 return FALSE;
1211 case GST_QUERY_SEEKING:
1212 gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1213 if (format != GST_FORMAT_BYTES)
1215 GST_WARNING("Cannot seek using format \"%s\".", gst_format_get_name(format));
1216 return FALSE;
1218 gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size);
1219 return TRUE;
1221 case GST_QUERY_SCHEDULING:
1222 gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1223 gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
1224 gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL);
1225 return TRUE;
1227 default:
1228 GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query));
1229 return FALSE;
1233 static void *push_data(void *arg)
1235 struct wg_parser *parser = arg;
1236 GstBuffer *buffer;
1237 guint max_size;
1239 GST_DEBUG("Starting push thread.");
1241 if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL)))
1243 GST_ERROR("Failed to allocate memory.");
1244 return NULL;
1247 max_size = parser->stop_offset ? parser->stop_offset : parser->file_size;
1249 for (;;)
1251 ULONG size;
1252 int ret;
1254 if (parser->next_offset >= max_size)
1255 break;
1256 size = min(16384, max_size - parser->next_offset);
1258 if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0)
1260 GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret));
1261 break;
1264 parser->next_offset += size;
1266 buffer->duration = buffer->pts = -1;
1267 if ((ret = gst_pad_push(parser->my_src, buffer)) < 0)
1269 GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret));
1270 break;
1274 gst_buffer_unref(buffer);
1276 gst_pad_push_event(parser->my_src, gst_event_new_eos());
1278 GST_DEBUG("Stopping push thread.");
1280 return NULL;
1283 static gboolean activate_push(GstPad *pad, gboolean activate)
1285 struct wg_parser *parser = gst_pad_get_element_private(pad);
1287 if (!activate)
1289 if (parser->push_thread)
1291 pthread_join(parser->push_thread, NULL);
1292 parser->push_thread = 0;
1295 else if (!parser->push_thread)
1297 int ret;
1299 if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser)))
1301 GST_ERROR("Failed to create push thread: %s", strerror(errno));
1302 parser->push_thread = 0;
1303 return FALSE;
1306 return TRUE;
1309 static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate)
1311 struct wg_parser *parser = gst_pad_get_element_private(pad);
1313 GST_DEBUG("%s source pad for parser %p in %s mode.",
1314 activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode));
1316 switch (mode)
1318 case GST_PAD_MODE_PULL:
1319 return TRUE;
1320 case GST_PAD_MODE_PUSH:
1321 return activate_push(pad, activate);
1322 case GST_PAD_MODE_NONE:
1323 break;
1325 return FALSE;
1328 static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user)
1330 struct wg_parser *parser = user;
1331 gchar *dbg_info = NULL;
1332 GError *err = NULL;
1334 GST_DEBUG("parser %p, message type %s.", parser, GST_MESSAGE_TYPE_NAME(msg));
1336 switch (msg->type)
1338 case GST_MESSAGE_ERROR:
1339 gst_message_parse_error(msg, &err, &dbg_info);
1340 if (parser->err_on)
1342 fprintf(stderr, "winegstreamer error: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
1343 fprintf(stderr, "winegstreamer error: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info);
1345 g_error_free(err);
1346 g_free(dbg_info);
1347 pthread_mutex_lock(&parser->mutex);
1348 parser->error = true;
1349 pthread_mutex_unlock(&parser->mutex);
1350 pthread_cond_signal(&parser->init_cond);
1351 break;
1353 case GST_MESSAGE_WARNING:
1354 gst_message_parse_warning(msg, &err, &dbg_info);
1355 if (parser->warn_on)
1357 fprintf(stderr, "winegstreamer warning: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
1358 fprintf(stderr, "winegstreamer warning: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info);
1360 g_error_free(err);
1361 g_free(dbg_info);
1362 break;
1364 case GST_MESSAGE_DURATION_CHANGED:
1365 pthread_mutex_lock(&parser->mutex);
1366 parser->has_duration = true;
1367 pthread_mutex_unlock(&parser->mutex);
1368 pthread_cond_signal(&parser->init_cond);
1369 break;
1371 default:
1372 break;
1374 gst_message_unref(msg);
1375 return GST_BUS_DROP;
1378 static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event)
1380 BOOL thread = !!parser->push_thread;
1381 GstSeekType cur_type, stop_type;
1382 GstFormat seek_format;
1383 GstEvent *flush_event;
1384 GstSeekFlags flags;
1385 gint64 cur, stop;
1386 guint32 seqnum;
1387 gdouble rate;
1389 gst_event_parse_seek(event, &rate, &seek_format, &flags,
1390 &cur_type, &cur, &stop_type, &stop);
1392 if (seek_format != GST_FORMAT_BYTES)
1394 GST_FIXME("Unhandled format \"%s\".", gst_format_get_name(seek_format));
1395 return FALSE;
1398 seqnum = gst_event_get_seqnum(event);
1400 /* send flush start */
1401 if (flags & GST_SEEK_FLAG_FLUSH)
1403 flush_event = gst_event_new_flush_start();
1404 gst_event_set_seqnum(flush_event, seqnum);
1405 gst_pad_push_event(parser->my_src, flush_event);
1406 if (thread)
1407 gst_pad_set_active(parser->my_src, 1);
1410 parser->next_offset = parser->start_offset = cur;
1412 /* and prepare to continue streaming */
1413 if (flags & GST_SEEK_FLAG_FLUSH)
1415 flush_event = gst_event_new_flush_stop(TRUE);
1416 gst_event_set_seqnum(flush_event, seqnum);
1417 gst_pad_push_event(parser->my_src, flush_event);
1418 if (thread)
1419 gst_pad_set_active(parser->my_src, 1);
1422 return TRUE;
1425 static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
1427 struct wg_parser *parser = gst_pad_get_element_private(pad);
1428 gboolean ret = TRUE;
1430 GST_LOG("parser %p, type \"%s\".", parser, GST_EVENT_TYPE_NAME(event));
1432 switch (event->type)
1434 case GST_EVENT_SEEK:
1435 ret = src_perform_seek(parser, event);
1436 break;
1438 case GST_EVENT_FLUSH_START:
1439 case GST_EVENT_FLUSH_STOP:
1440 case GST_EVENT_QOS:
1441 case GST_EVENT_RECONFIGURE:
1442 break;
1444 default:
1445 GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
1446 ret = FALSE;
1447 break;
1449 gst_event_unref(event);
1450 return ret;
1453 static void query_tags(struct wg_parser_stream *stream)
1455 const gchar *struct_name;
1456 GstEvent *tag_event;
1457 guint i, j;
1459 stream->tags[WG_PARSER_TAG_NAME] = NULL;
1460 stream->tags[WG_PARSER_TAG_LANGUAGE] = NULL;
1462 i = 0;
1463 while ((tag_event = gst_pad_get_sticky_event(stream->their_src, GST_EVENT_TAG, i++)))
1465 GstTagList *tag_list;
1467 gst_event_parse_tag(tag_event, &tag_list);
1469 if (!stream->tags[WG_PARSER_TAG_NAME])
1471 /* Extract stream name from Quick Time demuxer private tag where it puts unrecognized chunks. */
1472 const GValue *val;
1473 GstSample *sample;
1474 GstBuffer *buf;
1475 gsize size;
1476 guint tag_count = gst_tag_list_get_tag_size(tag_list, "private-qt-tag");
1478 for (j = 0; j < tag_count; ++j)
1480 if (!(val = gst_tag_list_get_value_index(tag_list, "private-qt-tag", j)))
1481 continue;
1482 if (!GST_VALUE_HOLDS_SAMPLE(val) || !(sample = gst_value_get_sample(val)))
1483 continue;
1484 struct_name = gst_structure_get_name(gst_sample_get_info(sample));
1485 if (!struct_name || strcmp(struct_name, "application/x-gst-qt-name-tag"))
1486 continue;
1487 if (!(buf = gst_sample_get_buffer(sample)))
1488 continue;
1489 if ((size = gst_buffer_get_size(buf)) < 8)
1490 continue;
1491 size -= 8;
1492 if (!(stream->tags[WG_PARSER_TAG_NAME] = g_malloc(size + 1)))
1493 continue;
1494 if (gst_buffer_extract(buf, 8, stream->tags[WG_PARSER_TAG_NAME], size) != size)
1496 g_free(stream->tags[WG_PARSER_TAG_NAME]);
1497 stream->tags[WG_PARSER_TAG_NAME] = NULL;
1498 continue;
1500 stream->tags[WG_PARSER_TAG_NAME][size] = 0;
1504 if (!stream->tags[WG_PARSER_TAG_LANGUAGE])
1506 gchar *lang_code = NULL;
1508 gst_tag_list_get_string(tag_list, GST_TAG_LANGUAGE_CODE, &lang_code);
1509 if (stream->parser->sink_caps && !strcmp(stream->parser->sink_caps, "video/quicktime"))
1511 /* For QuickTime media, we convert the language tags to ISO 639-1. */
1512 const gchar *lang_code_iso_639_1 = lang_code ? gst_tag_get_language_code_iso_639_1(lang_code) : NULL;
1513 stream->tags[WG_PARSER_TAG_LANGUAGE] = lang_code_iso_639_1 ? g_strdup(lang_code_iso_639_1) : NULL;
1514 g_free(lang_code);
1516 else
1517 stream->tags[WG_PARSER_TAG_LANGUAGE] = lang_code;
1520 gst_event_unref(tag_event);
1524 static NTSTATUS wg_parser_connect(void *args)
1526 GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src",
1527 GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
1528 const struct wg_parser_connect_params *params = args;
1529 struct wg_parser *parser = params->parser;
1530 unsigned int i;
1531 int ret;
1533 parser->file_size = params->file_size;
1534 parser->sink_connected = true;
1536 if (!parser->bus)
1538 parser->bus = gst_bus_new();
1539 gst_bus_set_sync_handler(parser->bus, bus_handler_cb, parser, NULL);
1542 parser->container = gst_bin_new(NULL);
1543 gst_element_set_bus(parser->container, parser->bus);
1545 parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src");
1546 gst_pad_set_getrange_function(parser->my_src, src_getrange_cb);
1547 gst_pad_set_query_function(parser->my_src, src_query_cb);
1548 gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb);
1549 gst_pad_set_event_function(parser->my_src, src_event_cb);
1550 gst_pad_set_element_private(parser->my_src, parser);
1552 parser->start_offset = parser->next_offset = parser->stop_offset = 0;
1553 parser->next_pull_offset = 0;
1554 parser->error = false;
1556 if (!parser->init_gst(parser))
1557 goto out;
1559 gst_element_set_state(parser->container, GST_STATE_PAUSED);
1560 ret = gst_element_get_state(parser->container, NULL, NULL, -1);
1561 if (ret == GST_STATE_CHANGE_FAILURE)
1563 GST_ERROR("Failed to play stream.\n");
1564 goto out;
1567 pthread_mutex_lock(&parser->mutex);
1569 while (!parser_no_more_pads(parser) && !parser->error)
1570 pthread_cond_wait(&parser->init_cond, &parser->mutex);
1571 if (parser->error)
1573 pthread_mutex_unlock(&parser->mutex);
1574 goto out;
1577 for (i = 0; i < parser->stream_count; ++i)
1579 struct wg_parser_stream *stream = parser->streams[i];
1580 gint64 duration;
1582 /* If we received a buffer, waiting for tags or caps does not make sense anymore. */
1583 while ((!stream->has_caps || !stream->has_tags) && !parser->error && !stream->has_buffer)
1584 pthread_cond_wait(&parser->init_cond, &parser->mutex);
1586 /* GStreamer doesn't actually provide any guarantees about when duration
1587 * is available, even for seekable streams. It's basically built for
1588 * applications that don't care, e.g. movie players that can display
1589 * a duration once it's available, and update it visually if a better
1590 * estimate is found. This doesn't really match well with DirectShow or
1591 * Media Foundation, which both expect duration to be available
1592 * immediately on connecting, so we have to use some complex heuristics
1593 * to try to actually get a usable duration.
1595 * Some elements (avidemux, wavparse, qtdemux) record duration almost
1596 * immediately, before fixing caps. Such elements don't send
1597 * duration-changed messages. Therefore always try querying duration
1598 * after caps have been found.
1600 * Some elements (mpegaudioparse) send duration-changed. In the case of
1601 * a mp3 stream without seek tables it will not be sent immediately, but
1602 * only after enough frames have been parsed to form an estimate. They
1603 * may send it multiple times with increasingly accurate estimates, but
1604 * unfortunately we have no way of knowing whether another estimate will
1605 * be sent, so we always take the first one. We assume that if the
1606 * duration is not immediately available then the element will always
1607 * send duration-changed.
1610 for (;;)
1612 if (parser->error)
1614 pthread_mutex_unlock(&parser->mutex);
1615 goto out;
1617 if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration))
1619 stream->duration = duration / 100;
1620 break;
1623 if (stream->eos)
1625 stream->duration = 0;
1626 GST_WARNING("Failed to query duration.\n");
1627 break;
1630 /* Elements based on GstBaseParse send duration-changed before
1631 * actually updating the duration in GStreamer versions prior
1632 * to 1.17.1. See <gstreamer.git:d28e0b4147fe7073b2>. So after
1633 * receiving duration-changed we have to continue polling until
1634 * the query succeeds. */
1635 if (parser->has_duration)
1637 pthread_mutex_unlock(&parser->mutex);
1638 g_usleep(10000);
1639 pthread_mutex_lock(&parser->mutex);
1641 else
1643 pthread_cond_wait(&parser->init_cond, &parser->mutex);
1647 query_tags(stream);
1649 /* Now that we're fully initialized, enable the stream so that further
1650 * samples get queued instead of being discarded. We don't actually need
1651 * the samples (in particular, the frontend should seek before
1652 * attempting to read anything), but we don't want to waste CPU time
1653 * trying to decode them. */
1654 stream->enabled = true;
1657 pthread_mutex_unlock(&parser->mutex);
1659 parser->next_offset = 0;
1660 return S_OK;
1662 out:
1663 if (parser->container)
1664 gst_element_set_state(parser->container, GST_STATE_NULL);
1665 if (parser->their_sink)
1667 gst_object_unref(parser->their_sink);
1668 parser->my_src = parser->their_sink = NULL;
1671 for (i = 0; i < parser->stream_count; ++i)
1672 free_stream(parser->streams[i]);
1673 parser->stream_count = 0;
1674 free(parser->streams);
1675 parser->streams = NULL;
1677 if (parser->container)
1679 gst_element_set_bus(parser->container, NULL);
1680 gst_object_unref(parser->container);
1681 parser->container = NULL;
1684 g_free(parser->sink_caps);
1685 parser->sink_caps = NULL;
1687 pthread_mutex_lock(&parser->mutex);
1688 parser->sink_connected = false;
1689 pthread_mutex_unlock(&parser->mutex);
1690 pthread_cond_signal(&parser->read_cond);
1692 return E_FAIL;
1695 static NTSTATUS wg_parser_disconnect(void *args)
1697 struct wg_parser *parser = args;
1698 unsigned int i;
1700 /* Unblock all of our streams. */
1701 pthread_mutex_lock(&parser->mutex);
1702 for (i = 0; i < parser->stream_count; ++i)
1704 parser->streams[i]->flushing = true;
1705 pthread_cond_signal(&parser->streams[i]->event_empty_cond);
1707 pthread_mutex_unlock(&parser->mutex);
1709 gst_element_set_state(parser->container, GST_STATE_NULL);
1710 gst_object_unref(parser->my_src);
1711 gst_object_unref(parser->their_sink);
1712 parser->my_src = parser->their_sink = NULL;
1714 pthread_mutex_lock(&parser->mutex);
1715 parser->sink_connected = false;
1716 pthread_mutex_unlock(&parser->mutex);
1717 pthread_cond_signal(&parser->read_cond);
1719 for (i = 0; i < parser->stream_count; ++i)
1720 free_stream(parser->streams[i]);
1722 parser->stream_count = 0;
1723 free(parser->streams);
1724 parser->streams = NULL;
1726 gst_element_set_bus(parser->container, NULL);
1727 gst_object_unref(parser->container);
1728 parser->container = NULL;
1730 g_free(parser->sink_caps);
1731 parser->sink_caps = NULL;
1733 for (i = 0; i < ARRAY_SIZE(parser->input_cache_chunks); i++)
1735 if (parser->input_cache_chunks[i].data)
1737 free(parser->input_cache_chunks[i].data);
1738 parser->input_cache_chunks[i].data = NULL;
1742 return S_OK;
1745 static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
1747 GstElement *element;
1748 int ret;
1750 if (!(element = create_element("decodebin", "base")))
1751 return FALSE;
1753 gst_bin_add(GST_BIN(parser->container), element);
1754 parser->decodebin = element;
1756 if (parser->unlimited_buffering)
1758 g_object_set(parser->decodebin, "max-size-buffers", G_MAXUINT, NULL);
1759 g_object_set(parser->decodebin, "max-size-time", G_MAXUINT64, NULL);
1760 g_object_set(parser->decodebin, "max-size-bytes", G_MAXUINT, NULL);
1763 g_signal_connect(element, "pad-added", G_CALLBACK(pad_added_cb), parser);
1764 g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser);
1765 g_signal_connect(element, "autoplug-continue", G_CALLBACK(autoplug_continue_cb), parser);
1766 g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser);
1767 g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
1769 parser->their_sink = gst_element_get_static_pad(element, "sink");
1771 pthread_mutex_lock(&parser->mutex);
1772 parser->no_more_pads = false;
1773 pthread_mutex_unlock(&parser->mutex);
1775 if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
1777 GST_ERROR("Failed to link pads, error %d.", ret);
1778 return FALSE;
1781 return TRUE;
1784 static BOOL avi_parser_init_gst(struct wg_parser *parser)
1786 GstElement *element;
1787 int ret;
1789 if (!(element = create_element("avidemux", "good")))
1790 return FALSE;
1792 gst_bin_add(GST_BIN(parser->container), element);
1794 g_signal_connect(element, "pad-added", G_CALLBACK(pad_added_cb), parser);
1795 g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser);
1796 g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
1798 parser->their_sink = gst_element_get_static_pad(element, "sink");
1800 pthread_mutex_lock(&parser->mutex);
1801 parser->no_more_pads = false;
1802 pthread_mutex_unlock(&parser->mutex);
1804 if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
1806 GST_ERROR("Failed to link pads, error %d.", ret);
1807 return FALSE;
1810 return TRUE;
1813 static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser)
1815 struct wg_parser_stream *stream;
1816 GstElement *element;
1817 int ret;
1819 if (!(element = create_element("mpegaudioparse", "good")))
1820 return FALSE;
1822 gst_bin_add(GST_BIN(parser->container), element);
1824 parser->their_sink = gst_element_get_static_pad(element, "sink");
1825 if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
1827 GST_ERROR("Failed to link sink pads, error %d.", ret);
1828 return FALSE;
1831 if (!(stream = create_stream(parser)))
1832 return FALSE;
1834 gst_object_ref(stream->their_src = gst_element_get_static_pad(element, "src"));
1835 if ((ret = gst_pad_link(stream->their_src, stream->my_sink)) < 0)
1837 GST_ERROR("Failed to link source pads, error %d.", ret);
1838 return FALSE;
1840 gst_pad_set_active(stream->my_sink, 1);
1842 parser->no_more_pads = true;
1844 return TRUE;
1847 static BOOL wave_parser_init_gst(struct wg_parser *parser)
1849 struct wg_parser_stream *stream;
1850 GstElement *element;
1851 int ret;
1853 if (!(element = create_element("wavparse", "good")))
1854 return FALSE;
1856 gst_bin_add(GST_BIN(parser->container), element);
1858 parser->their_sink = gst_element_get_static_pad(element, "sink");
1859 if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
1861 GST_ERROR("Failed to link sink pads, error %d.", ret);
1862 return FALSE;
1865 if (!(stream = create_stream(parser)))
1866 return FALSE;
1868 stream->their_src = gst_element_get_static_pad(element, "src");
1869 gst_object_ref(stream->their_src);
1870 if ((ret = gst_pad_link(stream->their_src, stream->my_sink)) < 0)
1872 GST_ERROR("Failed to link source pads, error %d.", ret);
1873 return FALSE;
1875 gst_pad_set_active(stream->my_sink, 1);
1877 parser->no_more_pads = true;
1879 return TRUE;
1882 static NTSTATUS wg_parser_create(void *args)
1884 static const init_gst_cb init_funcs[] =
1886 [WG_PARSER_DECODEBIN] = decodebin_parser_init_gst,
1887 [WG_PARSER_AVIDEMUX] = avi_parser_init_gst,
1888 [WG_PARSER_MPEGAUDIOPARSE] = mpeg_audio_parser_init_gst,
1889 [WG_PARSER_WAVPARSE] = wave_parser_init_gst,
1892 struct wg_parser_create_params *params = args;
1893 struct wg_parser *parser;
1895 if (!(parser = calloc(1, sizeof(*parser))))
1896 return E_OUTOFMEMORY;
1898 pthread_mutex_init(&parser->mutex, NULL);
1899 pthread_cond_init(&parser->init_cond, NULL);
1900 pthread_cond_init(&parser->read_cond, NULL);
1901 pthread_cond_init(&parser->read_done_cond, NULL);
1902 parser->init_gst = init_funcs[params->type];
1903 parser->unlimited_buffering = params->unlimited_buffering;
1904 parser->err_on = params->err_on;
1905 parser->warn_on = params->warn_on;
1906 GST_DEBUG("Created winegstreamer parser %p.", parser);
1907 params->parser = parser;
1908 return S_OK;
1911 static NTSTATUS wg_parser_destroy(void *args)
1913 struct wg_parser *parser = args;
1915 if (parser->bus)
1917 gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL);
1918 gst_object_unref(parser->bus);
1921 pthread_mutex_destroy(&parser->mutex);
1922 pthread_cond_destroy(&parser->init_cond);
1923 pthread_cond_destroy(&parser->read_cond);
1924 pthread_cond_destroy(&parser->read_done_cond);
1926 free(parser);
1927 return S_OK;
1930 const unixlib_entry_t __wine_unix_call_funcs[] =
1932 #define X(name) [unix_ ## name] = name
1933 X(wg_init_gstreamer),
1935 X(wg_parser_create),
1936 X(wg_parser_destroy),
1938 X(wg_parser_connect),
1939 X(wg_parser_disconnect),
1941 X(wg_parser_get_next_read_offset),
1942 X(wg_parser_push_data),
1944 X(wg_parser_get_stream_count),
1945 X(wg_parser_get_stream),
1947 X(wg_parser_stream_get_preferred_format),
1948 X(wg_parser_stream_enable),
1949 X(wg_parser_stream_disable),
1951 X(wg_parser_stream_get_buffer),
1952 X(wg_parser_stream_copy_buffer),
1953 X(wg_parser_stream_release_buffer),
1954 X(wg_parser_stream_notify_qos),
1956 X(wg_parser_stream_get_duration),
1957 X(wg_parser_stream_get_tag),
1958 X(wg_parser_stream_seek),
1960 X(wg_transform_create),
1961 X(wg_transform_destroy),
1962 X(wg_transform_set_output_format),
1964 X(wg_transform_push_data),
1965 X(wg_transform_read_data),
1966 X(wg_transform_get_status),