winegstreamer: Remove unnecessary wg_parser_stream their_src pad.
[wine.git] / dlls / winegstreamer / wg_parser.c
blobef274b1dc2705799004c7387aca0d082006dd4ff
1 /*
2 * GStreamer parser backend
4 * Copyright 2010 Maarten Lankhorst for CodeWeavers
5 * Copyright 2010 Aric Stewart for CodeWeavers
6 * Copyright 2019-2020 Zebediah Figura
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
23 #if 0
24 #pragma makedep unix
25 #endif
27 #include "config.h"
29 #include <assert.h>
30 #include <stdarg.h>
31 #include <stdio.h>
33 #include <gst/gst.h>
34 #include <gst/video/video.h>
35 #include <gst/audio/audio.h>
36 #include <gst/tag/tag.h>
38 #include "ntstatus.h"
39 #define WIN32_NO_STATUS
40 #include "winternl.h"
41 #include "dshow.h"
43 #include "unix_private.h"
45 typedef enum
47 GST_AUTOPLUG_SELECT_TRY,
48 GST_AUTOPLUG_SELECT_EXPOSE,
49 GST_AUTOPLUG_SELECT_SKIP,
50 } GstAutoplugSelectResult;
52 typedef BOOL (*init_gst_cb)(struct wg_parser *parser);
54 struct input_cache_chunk
56 guint64 position;
57 uint8_t *data;
60 struct wg_parser
62 init_gst_cb init_gst;
64 struct wg_parser_stream **streams;
65 unsigned int stream_count;
67 GstElement *container, *decodebin;
68 GstBus *bus;
69 GstPad *my_src;
71 guint64 file_size, start_offset, next_offset, stop_offset;
72 guint64 next_pull_offset;
74 pthread_t push_thread;
76 pthread_mutex_t mutex;
78 pthread_cond_t init_cond;
79 bool no_more_pads, has_duration, error;
80 bool err_on, warn_on;
82 pthread_cond_t read_cond, read_done_cond;
83 struct
85 GstBuffer *buffer;
86 uint64_t offset;
87 uint32_t size;
88 bool done;
89 GstFlowReturn ret;
90 } read_request;
92 bool sink_connected;
94 bool unlimited_buffering;
96 gchar *sink_caps;
98 struct input_cache_chunk input_cache_chunks[4];
100 static const unsigned int input_cache_chunk_size = 512 << 10;
102 struct wg_parser_stream
104 struct wg_parser *parser;
105 uint32_t number;
107 GstPad *my_sink;
108 GstElement *flip, *decodebin;
109 GstSegment segment;
110 struct wg_format preferred_format, current_format, codec_format;
112 pthread_cond_t event_cond, event_empty_cond;
113 GstBuffer *buffer;
114 GstMapInfo map_info;
116 bool flushing, eos, enabled, has_caps, has_tags, has_buffer, no_more_pads;
118 uint64_t duration;
119 gchar *tags[WG_PARSER_TAG_COUNT];
122 static bool format_is_compressed(struct wg_format *format)
124 return format->major_type != WG_MAJOR_TYPE_UNKNOWN
125 && format->major_type != WG_MAJOR_TYPE_VIDEO
126 && format->major_type != WG_MAJOR_TYPE_AUDIO;
129 static NTSTATUS wg_parser_get_stream_count(void *args)
131 struct wg_parser_get_stream_count_params *params = args;
133 params->count = params->parser->stream_count;
134 return S_OK;
137 static NTSTATUS wg_parser_get_stream(void *args)
139 struct wg_parser_get_stream_params *params = args;
141 params->stream = params->parser->streams[params->index];
142 return S_OK;
145 static NTSTATUS wg_parser_get_next_read_offset(void *args)
147 struct wg_parser_get_next_read_offset_params *params = args;
148 struct wg_parser *parser = params->parser;
150 pthread_mutex_lock(&parser->mutex);
152 while (parser->sink_connected && !parser->read_request.size)
153 pthread_cond_wait(&parser->read_cond, &parser->mutex);
155 if (!parser->sink_connected)
157 pthread_mutex_unlock(&parser->mutex);
158 return VFW_E_WRONG_STATE;
161 params->offset = parser->read_request.offset;
162 params->size = parser->read_request.size;
164 pthread_mutex_unlock(&parser->mutex);
165 return S_OK;
168 static NTSTATUS wg_parser_push_data(void *args)
170 const struct wg_parser_push_data_params *params = args;
171 struct wg_parser *parser = params->parser;
172 const void *data = params->data;
173 uint32_t size = params->size;
175 pthread_mutex_lock(&parser->mutex);
177 if (data)
179 if (size)
181 GstMapInfo map_info;
183 /* Note that we don't allocate the buffer until we have a size.
184 * midiparse passes a NULL buffer and a size of UINT_MAX, in an
185 * apparent attempt to read the whole input stream at once. */
186 if (!parser->read_request.buffer)
187 parser->read_request.buffer = gst_buffer_new_and_alloc(size);
188 gst_buffer_map(parser->read_request.buffer, &map_info, GST_MAP_WRITE);
189 memcpy(map_info.data, data, size);
190 gst_buffer_unmap(parser->read_request.buffer, &map_info);
191 parser->read_request.ret = GST_FLOW_OK;
193 else
195 parser->read_request.ret = GST_FLOW_EOS;
198 else
200 parser->read_request.ret = GST_FLOW_ERROR;
202 parser->read_request.done = true;
203 parser->read_request.size = 0;
205 pthread_mutex_unlock(&parser->mutex);
206 pthread_cond_signal(&parser->read_done_cond);
208 return S_OK;
211 static NTSTATUS wg_parser_stream_get_preferred_format(void *args)
213 const struct wg_parser_stream_get_preferred_format_params *params = args;
215 *params->format = params->stream->preferred_format;
216 return S_OK;
219 static NTSTATUS wg_parser_stream_get_codec_format(void *args)
221 struct wg_parser_stream_get_codec_format_params *params = args;
223 *params->format = format_is_compressed(&params->stream->codec_format) ?
224 params->stream->codec_format :
225 params->stream->preferred_format;
226 return S_OK;
229 static NTSTATUS wg_parser_stream_enable(void *args)
231 const struct wg_parser_stream_enable_params *params = args;
232 struct wg_parser_stream *stream = params->stream;
233 const struct wg_format *format = params->format;
234 struct wg_parser *parser = stream->parser;
236 pthread_mutex_lock(&parser->mutex);
238 stream->current_format = *format;
239 stream->enabled = true;
241 pthread_mutex_unlock(&parser->mutex);
243 if (format->major_type == WG_MAJOR_TYPE_VIDEO)
245 bool flip = (format->u.video.height < 0);
247 gst_util_set_object_arg(G_OBJECT(stream->flip), "method", flip ? "vertical-flip" : "none");
250 gst_pad_push_event(stream->my_sink, gst_event_new_reconfigure());
251 return S_OK;
254 static NTSTATUS wg_parser_stream_disable(void *args)
256 struct wg_parser_stream *stream = args;
257 struct wg_parser *parser = stream->parser;
259 pthread_mutex_lock(&parser->mutex);
260 stream->enabled = false;
261 stream->current_format.major_type = WG_MAJOR_TYPE_UNKNOWN;
262 pthread_mutex_unlock(&parser->mutex);
263 pthread_cond_signal(&stream->event_empty_cond);
264 return S_OK;
267 static GstBuffer *wait_parser_stream_buffer(struct wg_parser *parser, struct wg_parser_stream *stream)
269 GstBuffer *buffer = NULL;
271 /* Note that we can both have a buffer and stream->eos, in which case we
272 * must return the buffer. */
274 while (stream->enabled && !(buffer = stream->buffer) && !stream->eos)
275 pthread_cond_wait(&stream->event_cond, &parser->mutex);
277 return buffer;
280 static NTSTATUS wg_parser_stream_get_buffer(void *args)
282 const struct wg_parser_stream_get_buffer_params *params = args;
283 struct wg_parser_buffer *wg_buffer = params->buffer;
284 struct wg_parser_stream *stream = params->stream;
285 struct wg_parser *parser = params->parser;
286 GstBuffer *buffer;
287 unsigned int i;
289 pthread_mutex_lock(&parser->mutex);
291 if (stream)
292 buffer = wait_parser_stream_buffer(parser, stream);
293 else
295 /* Find the earliest buffer by PTS.
297 * Native seems to behave similarly to this with the wm async reader, although our
298 * unit tests show that it's not entirely consistent—some frames are received
299 * slightly out of order. It's possible that one stream is being manually offset
300 * to account for decoding latency.
302 * The behaviour with the wm sync reader, when stream 0 is requested, seems
303 * consistent with this hypothesis, but with a much larger offset—the video
304 * stream seems to be "behind" by about 150 ms.
306 * The main reason for doing this is that the video and audio stream probably
307 * don't have quite the same "frame rate", and we don't want to force one stream
308 * to decode faster just to keep up with the other. Delivering samples in PTS
309 * order should avoid that problem. */
310 GstBuffer *earliest = NULL;
312 for (i = 0; i < parser->stream_count; ++i)
314 if (!(buffer = wait_parser_stream_buffer(parser, parser->streams[i])))
315 continue;
316 /* invalid PTS is GST_CLOCK_TIME_NONE == (guint64)-1, so this will prefer valid timestamps. */
317 if (!earliest || GST_BUFFER_PTS(buffer) < GST_BUFFER_PTS(earliest))
319 stream = parser->streams[i];
320 earliest = buffer;
324 buffer = earliest;
327 if (!buffer)
329 pthread_mutex_unlock(&parser->mutex);
330 return S_FALSE;
333 /* FIXME: Should we use gst_segment_to_stream_time_full()? Under what
334 * circumstances is the stream time not equal to the buffer PTS? Note
335 * that this will need modification to wg_parser_stream_notify_qos() as
336 * well. */
338 if ((wg_buffer->has_pts = GST_BUFFER_PTS_IS_VALID(buffer)))
339 wg_buffer->pts = GST_BUFFER_PTS(buffer) / 100;
340 if ((wg_buffer->has_duration = GST_BUFFER_DURATION_IS_VALID(buffer)))
341 wg_buffer->duration = GST_BUFFER_DURATION(buffer) / 100;
342 wg_buffer->discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT);
343 wg_buffer->preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE);
344 wg_buffer->delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
345 wg_buffer->size = gst_buffer_get_size(buffer);
346 wg_buffer->stream = stream->number;
348 pthread_mutex_unlock(&parser->mutex);
349 return S_OK;
352 static NTSTATUS wg_parser_stream_copy_buffer(void *args)
354 const struct wg_parser_stream_copy_buffer_params *params = args;
355 struct wg_parser_stream *stream = params->stream;
356 struct wg_parser *parser = stream->parser;
357 uint32_t offset = params->offset;
358 uint32_t size = params->size;
360 pthread_mutex_lock(&parser->mutex);
362 if (!stream->buffer)
364 pthread_mutex_unlock(&parser->mutex);
365 return VFW_E_WRONG_STATE;
368 assert(offset < stream->map_info.size);
369 assert(offset + size <= stream->map_info.size);
370 memcpy(params->data, stream->map_info.data + offset, size);
372 pthread_mutex_unlock(&parser->mutex);
373 return S_OK;
376 static NTSTATUS wg_parser_stream_release_buffer(void *args)
378 struct wg_parser_stream *stream = args;
379 struct wg_parser *parser = stream->parser;
381 pthread_mutex_lock(&parser->mutex);
383 assert(stream->buffer);
385 gst_buffer_unmap(stream->buffer, &stream->map_info);
386 gst_buffer_unref(stream->buffer);
387 stream->buffer = NULL;
389 pthread_mutex_unlock(&parser->mutex);
390 pthread_cond_signal(&stream->event_empty_cond);
392 return S_OK;
395 static NTSTATUS wg_parser_stream_get_duration(void *args)
397 struct wg_parser_stream_get_duration_params *params = args;
399 params->duration = params->stream->duration;
400 return S_OK;
403 static NTSTATUS wg_parser_stream_get_tag(void *args)
405 struct wg_parser_stream_get_tag_params *params = args;
406 uint32_t len;
408 if (params->tag >= WG_PARSER_TAG_COUNT)
409 return STATUS_INVALID_PARAMETER;
410 if (!params->stream->tags[params->tag])
411 return STATUS_NOT_FOUND;
412 if ((len = strlen(params->stream->tags[params->tag]) + 1) > *params->size)
414 *params->size = len;
415 return STATUS_BUFFER_TOO_SMALL;
417 memcpy(params->buffer, params->stream->tags[params->tag], len);
418 return STATUS_SUCCESS;
421 static NTSTATUS wg_parser_stream_seek(void *args)
423 GstSeekType start_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET;
424 const struct wg_parser_stream_seek_params *params = args;
425 DWORD start_flags = params->start_flags;
426 DWORD stop_flags = params->stop_flags;
427 GstSeekFlags flags = 0;
429 if (start_flags & AM_SEEKING_SeekToKeyFrame)
430 flags |= GST_SEEK_FLAG_KEY_UNIT;
431 if (start_flags & AM_SEEKING_Segment)
432 flags |= GST_SEEK_FLAG_SEGMENT;
433 if (!(start_flags & AM_SEEKING_NoFlush))
434 flags |= GST_SEEK_FLAG_FLUSH;
436 if ((start_flags & AM_SEEKING_PositioningBitsMask) == AM_SEEKING_NoPositioning)
437 start_type = GST_SEEK_TYPE_NONE;
438 if ((stop_flags & AM_SEEKING_PositioningBitsMask) == AM_SEEKING_NoPositioning)
439 stop_type = GST_SEEK_TYPE_NONE;
441 if (!gst_pad_push_event(params->stream->my_sink, gst_event_new_seek(params->rate, GST_FORMAT_TIME,
442 flags, start_type, params->start_pos * 100, stop_type, params->stop_pos * 100)))
443 GST_ERROR("Failed to seek.\n");
445 return S_OK;
448 static NTSTATUS wg_parser_stream_notify_qos(void *args)
450 const struct wg_parser_stream_notify_qos_params *params = args;
451 struct wg_parser_stream *stream = params->stream;
452 GstClockTime stream_time;
453 GstEvent *event;
455 /* We return timestamps in stream time, i.e. relative to the start of the
456 * file (or other medium), but gst_event_new_qos() expects the timestamp in
457 * running time. */
458 stream_time = gst_segment_to_running_time(&stream->segment, GST_FORMAT_TIME, params->timestamp * 100);
459 if (stream_time == -1)
461 /* This can happen legitimately if the sample falls outside of the
462 * segment bounds. GStreamer elements shouldn't present the sample in
463 * that case, but DirectShow doesn't care. */
464 GST_LOG("Ignoring QoS event.\n");
465 return S_OK;
468 if (!(event = gst_event_new_qos(params->underflow ? GST_QOS_TYPE_UNDERFLOW : GST_QOS_TYPE_OVERFLOW,
469 params->proportion, params->diff * 100, stream_time)))
470 GST_ERROR("Failed to create QOS event.\n");
471 gst_pad_push_event(stream->my_sink, event);
473 return S_OK;
476 static bool parser_no_more_pads(struct wg_parser *parser)
478 unsigned int i;
480 for (i = 0; i < parser->stream_count; ++i)
482 if (!parser->streams[i]->no_more_pads)
483 return false;
486 return parser->no_more_pads;
489 static gboolean autoplug_continue_cb(GstElement * decodebin, GstPad *pad, GstCaps * caps, gpointer user)
491 struct wg_format format;
493 wg_format_from_caps(&format, caps);
495 return !format_is_compressed(&format);
498 static GstAutoplugSelectResult autoplug_select_cb(GstElement *bin, GstPad *pad,
499 GstCaps *caps, GstElementFactory *fact, gpointer user)
501 struct wg_parser *parser = user;
502 const char *name = gst_element_factory_get_longname(fact);
503 const char *klass = gst_element_factory_get_klass(fact);
505 GST_INFO("Using \"%s\".", name);
507 if (strstr(name, "Player protection"))
509 GST_WARNING("Blacklisted a/52 decoder because it only works in Totem.");
510 return GST_AUTOPLUG_SELECT_SKIP;
512 if (!strcmp(name, "Fluendo Hardware Accelerated Video Decoder"))
514 GST_WARNING("Disabled video acceleration since it breaks in wine.");
515 return GST_AUTOPLUG_SELECT_SKIP;
518 if (!parser->sink_caps && strstr(klass, GST_ELEMENT_FACTORY_KLASS_DEMUXER))
519 parser->sink_caps = g_strdup(gst_structure_get_name(gst_caps_get_structure(caps, 0)));
521 return GST_AUTOPLUG_SELECT_TRY;
524 static void no_more_pads_cb(GstElement *element, gpointer user)
526 struct wg_parser *parser = user;
528 GST_DEBUG("parser %p.", parser);
530 pthread_mutex_lock(&parser->mutex);
531 parser->no_more_pads = true;
532 pthread_mutex_unlock(&parser->mutex);
533 pthread_cond_signal(&parser->init_cond);
536 static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
538 struct wg_parser_stream *stream = gst_pad_get_element_private(pad);
539 struct wg_parser *parser = stream->parser;
541 GST_LOG("stream %p, type \"%s\".", stream, GST_EVENT_TYPE_NAME(event));
543 switch (event->type)
545 case GST_EVENT_SEGMENT:
546 pthread_mutex_lock(&parser->mutex);
547 if (stream->enabled)
549 const GstSegment *segment;
551 gst_event_parse_segment(event, &segment);
553 if (segment->format != GST_FORMAT_TIME)
555 pthread_mutex_unlock(&parser->mutex);
556 GST_FIXME("Unhandled format \"%s\".", gst_format_get_name(segment->format));
557 break;
560 gst_segment_copy_into(segment, &stream->segment);
562 pthread_mutex_unlock(&parser->mutex);
563 break;
565 case GST_EVENT_EOS:
566 pthread_mutex_lock(&parser->mutex);
567 stream->eos = true;
568 if (stream->enabled)
569 pthread_cond_signal(&stream->event_cond);
570 else
571 pthread_cond_signal(&parser->init_cond);
572 pthread_mutex_unlock(&parser->mutex);
573 break;
575 case GST_EVENT_FLUSH_START:
576 pthread_mutex_lock(&parser->mutex);
578 if (stream->enabled)
580 stream->flushing = true;
581 pthread_cond_signal(&stream->event_empty_cond);
583 if (stream->buffer)
585 gst_buffer_unmap(stream->buffer, &stream->map_info);
586 gst_buffer_unref(stream->buffer);
587 stream->buffer = NULL;
591 pthread_mutex_unlock(&parser->mutex);
592 break;
594 case GST_EVENT_FLUSH_STOP:
596 gboolean reset_time;
598 gst_event_parse_flush_stop(event, &reset_time);
600 if (reset_time)
601 gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
603 pthread_mutex_lock(&parser->mutex);
605 stream->eos = false;
606 if (stream->enabled)
607 stream->flushing = false;
609 pthread_mutex_unlock(&parser->mutex);
610 break;
613 case GST_EVENT_CAPS:
615 GstCaps *caps;
617 gst_event_parse_caps(event, &caps);
618 pthread_mutex_lock(&parser->mutex);
619 wg_format_from_caps(&stream->preferred_format, caps);
620 stream->has_caps = true;
621 pthread_mutex_unlock(&parser->mutex);
622 pthread_cond_signal(&parser->init_cond);
623 break;
626 case GST_EVENT_TAG:
627 pthread_mutex_lock(&parser->mutex);
628 stream->has_tags = true;
629 pthread_cond_signal(&parser->init_cond);
630 pthread_mutex_unlock(&parser->mutex);
631 break;
633 default:
634 GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
636 gst_event_unref(event);
637 return TRUE;
640 static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer)
642 struct wg_parser_stream *stream = gst_pad_get_element_private(pad);
643 struct wg_parser *parser = stream->parser;
645 GST_LOG("stream %p, buffer %p.", stream, buffer);
647 pthread_mutex_lock(&parser->mutex);
649 if (!stream->has_buffer)
651 stream->has_buffer = true;
652 pthread_cond_signal(&parser->init_cond);
655 /* Allow this buffer to be flushed by GStreamer. We are effectively
656 * implementing a queue object here. */
658 while (stream->enabled && !stream->flushing && stream->buffer)
659 pthread_cond_wait(&stream->event_empty_cond, &parser->mutex);
661 if (!stream->enabled)
663 pthread_mutex_unlock(&parser->mutex);
664 gst_buffer_unref(buffer);
665 return GST_FLOW_OK;
668 if (stream->flushing)
670 pthread_mutex_unlock(&parser->mutex);
671 GST_DEBUG("Stream is flushing; discarding buffer.");
672 gst_buffer_unref(buffer);
673 return GST_FLOW_FLUSHING;
676 if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ))
678 pthread_mutex_unlock(&parser->mutex);
679 GST_ERROR("Failed to map buffer.\n");
680 gst_buffer_unref(buffer);
681 return GST_FLOW_ERROR;
684 stream->buffer = buffer;
686 pthread_mutex_unlock(&parser->mutex);
687 pthread_cond_signal(&stream->event_cond);
689 /* The chain callback is given a reference to the buffer. Transfer that
690 * reference to the stream object, which will release it in
691 * wg_parser_stream_release_buffer(). */
693 GST_LOG("Buffer queued.");
694 return GST_FLOW_OK;
697 static gboolean sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
699 struct wg_parser_stream *stream = gst_pad_get_element_private(pad);
700 struct wg_parser *parser = stream->parser;
702 GST_LOG("stream %p, type \"%s\".", stream, gst_query_type_get_name(query->type));
704 switch (query->type)
706 case GST_QUERY_CAPS:
708 GstCaps *caps, *filter, *temp;
709 gchar *str;
710 gsize i;
712 gst_query_parse_caps(query, &filter);
714 pthread_mutex_lock(&parser->mutex);
715 caps = wg_format_to_caps(&stream->current_format);
716 pthread_mutex_unlock(&parser->mutex);
718 if (!caps)
719 return FALSE;
721 /* Clear some fields that shouldn't prevent us from connecting. */
722 for (i = 0; i < gst_caps_get_size(caps); ++i)
723 gst_structure_remove_fields(gst_caps_get_structure(caps, i),
724 "framerate", "pixel-aspect-ratio", "colorimetry", "chroma-site", NULL);
726 str = gst_caps_to_string(caps);
727 GST_LOG("Stream caps are \"%s\".", str);
728 g_free(str);
730 if (filter)
732 temp = gst_caps_intersect(caps, filter);
733 gst_caps_unref(caps);
734 caps = temp;
737 gst_query_set_caps_result(query, caps);
738 gst_caps_unref(caps);
739 return TRUE;
742 case GST_QUERY_ACCEPT_CAPS:
744 struct wg_format format;
745 gboolean ret = TRUE;
746 GstCaps *caps;
748 pthread_mutex_lock(&parser->mutex);
750 if (stream->current_format.major_type == WG_MAJOR_TYPE_UNKNOWN)
752 pthread_mutex_unlock(&parser->mutex);
753 gst_query_set_accept_caps_result(query, TRUE);
754 return TRUE;
757 gst_query_parse_accept_caps(query, &caps);
758 wg_format_from_caps(&format, caps);
759 ret = wg_format_compare(&format, &stream->current_format);
761 pthread_mutex_unlock(&parser->mutex);
763 if (!ret && gst_debug_category_get_threshold(GST_CAT_DEFAULT) >= GST_LEVEL_WARNING)
765 gchar *str = gst_caps_to_string(caps);
766 GST_WARNING("Rejecting caps \"%s\".", str);
767 g_free(str);
769 gst_query_set_accept_caps_result(query, ret);
770 return TRUE;
773 default:
774 return gst_pad_query_default (pad, parent, query);
778 static struct wg_parser_stream *create_stream(struct wg_parser *parser)
780 struct wg_parser_stream *stream, **new_array;
781 char pad_name[19];
783 if (!(new_array = realloc(parser->streams, (parser->stream_count + 1) * sizeof(*parser->streams))))
784 return NULL;
785 parser->streams = new_array;
787 if (!(stream = calloc(1, sizeof(*stream))))
788 return NULL;
790 gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
792 stream->parser = parser;
793 stream->number = parser->stream_count;
794 stream->no_more_pads = true;
795 stream->current_format.major_type = WG_MAJOR_TYPE_UNKNOWN;
796 pthread_cond_init(&stream->event_cond, NULL);
797 pthread_cond_init(&stream->event_empty_cond, NULL);
799 sprintf(pad_name, "qz_sink_%u", parser->stream_count);
800 stream->my_sink = gst_pad_new(pad_name, GST_PAD_SINK);
801 gst_pad_set_element_private(stream->my_sink, stream);
802 gst_pad_set_chain_function(stream->my_sink, sink_chain_cb);
803 gst_pad_set_event_function(stream->my_sink, sink_event_cb);
804 gst_pad_set_query_function(stream->my_sink, sink_query_cb);
806 parser->streams[parser->stream_count++] = stream;
807 return stream;
810 static void free_stream(struct wg_parser_stream *stream)
812 unsigned int i;
814 gst_object_unref(stream->my_sink);
816 if (stream->buffer)
818 gst_buffer_unmap(stream->buffer, &stream->map_info);
819 gst_buffer_unref(stream->buffer);
820 stream->buffer = NULL;
823 pthread_cond_destroy(&stream->event_cond);
824 pthread_cond_destroy(&stream->event_empty_cond);
826 for (i = 0; i < ARRAY_SIZE(stream->tags); ++i)
828 if (stream->tags[i])
829 g_free(stream->tags[i]);
831 free(stream);
834 static bool stream_create_post_processing_elements(GstPad *pad, struct wg_parser_stream *stream)
836 GstElement *element = NULL, *first = NULL, *last = NULL;
837 struct wg_parser *parser = stream->parser;
838 const char *name;
839 GstCaps *caps;
840 int ret;
842 caps = gst_pad_query_caps(pad, NULL);
843 name = gst_structure_get_name(gst_caps_get_structure(caps, 0));
844 gst_caps_unref(caps);
846 if (!strcmp(name, "video/x-raw"))
848 /* DirectShow can express interlaced video, but downstream filters can't
849 * necessarily consume it. In particular, the video renderer can't. */
850 if (!(element = create_element("deinterlace", "good"))
851 || !append_element(parser->container, element, &first, &last))
852 return false;
854 /* decodebin considers many YUV formats to be "raw", but some quartz
855 * filters can't handle those. Also, videoflip can't handle all "raw"
856 * formats either. Add a videoconvert to swap color spaces. */
857 if (!(element = create_element("videoconvert", "base"))
858 || !append_element(parser->container, element, &first, &last))
859 return false;
861 /* GStreamer outputs RGB video top-down, but DirectShow expects bottom-up. */
862 if (!(element = create_element("videoflip", "good"))
863 || !append_element(parser->container, element, &first, &last))
864 return false;
865 stream->flip = element;
867 /* videoflip does not support 15 and 16-bit RGB so add a second videoconvert
868 * to do the final conversion. */
869 if (!(element = create_element("videoconvert", "base"))
870 || !append_element(parser->container, element, &first, &last))
871 return false;
873 if (!link_src_to_element(pad, first) || !link_element_to_sink(last, stream->my_sink))
874 return false;
876 else if (!strcmp(name, "audio/x-raw"))
878 /* Currently our dsound can't handle 64-bit formats or all
879 * surround-sound configurations. Native dsound can't always handle
880 * 64-bit formats either. Add an audioconvert to allow changing bit
881 * depth and channel count. */
882 if (!(element = create_element("audioconvert", "base"))
883 || !append_element(parser->container, element, &first, &last))
884 return false;
886 if (!link_src_to_element(pad, first) || !link_element_to_sink(last, stream->my_sink))
887 return false;
889 else if ((ret = gst_pad_link(pad, stream->my_sink)) < 0)
891 GST_ERROR("Failed to link decodebin source pad to our sink pad, error %s.",
892 gst_pad_link_get_name(ret));
893 return false;
896 return true;
899 static void stream_decodebin_no_more_pads_cb(GstElement *element, gpointer user)
901 struct wg_parser_stream *stream = user;
902 struct wg_parser *parser = stream->parser;
904 GST_DEBUG("stream %p, parser %p, element %p.", stream, parser, element);
906 pthread_mutex_lock(&parser->mutex);
907 stream->no_more_pads = true;
908 pthread_mutex_unlock(&parser->mutex);
909 pthread_cond_signal(&parser->init_cond);
912 static void stream_decodebin_pad_added_cb(GstElement *element, GstPad *pad, gpointer user)
914 struct wg_parser_stream *stream = user;
915 struct wg_parser *parser = stream->parser;
917 GST_LOG("stream %p, parser %p, element %p, pad %p.", stream, parser, element, pad);
919 if (gst_pad_is_linked(pad))
920 return;
922 if (!stream_create_post_processing_elements(pad, stream))
923 return;
924 gst_pad_set_active(stream->my_sink, 1);
927 static bool stream_decodebin_create(struct wg_parser_stream *stream)
929 struct wg_parser *parser = stream->parser;
931 GST_LOG("stream %p, parser %p.", stream, parser);
933 if (!(stream->decodebin = create_element("decodebin", "base")))
934 return false;
935 gst_bin_add(GST_BIN(parser->container), stream->decodebin);
937 g_signal_connect(stream->decodebin, "pad-added", G_CALLBACK(stream_decodebin_pad_added_cb), stream);
938 g_signal_connect(stream->decodebin, "autoplug-select", G_CALLBACK(autoplug_select_cb), stream);
939 g_signal_connect(stream->decodebin, "no-more-pads", G_CALLBACK(stream_decodebin_no_more_pads_cb), stream);
941 pthread_mutex_lock(&parser->mutex);
942 stream->no_more_pads = false;
943 pthread_mutex_unlock(&parser->mutex);
944 gst_element_sync_state_with_parent(stream->decodebin);
946 GST_LOG("Created stream decodebin %p for %u.", stream->decodebin, stream->number);
948 return true;
951 static void pad_added_cb(GstElement *element, GstPad *pad, gpointer user)
953 struct wg_parser_stream *stream;
954 struct wg_parser *parser = user;
955 GstCaps *caps;
957 GST_LOG("parser %p, element %p, pad %p.", parser, element, pad);
959 if (gst_pad_is_linked(pad))
960 return;
962 if (!(stream = create_stream(parser)))
963 return;
965 caps = gst_pad_query_caps(pad, NULL);
966 wg_format_from_caps(&stream->codec_format, caps);
967 gst_caps_unref(caps);
969 /* For compressed stream, create an extra decodebin to decode it. */
970 if (format_is_compressed(&stream->codec_format))
972 if (!stream_decodebin_create(stream))
974 GST_ERROR("Failed to create decodebin for stream %u.", stream->number);
975 return;
978 if (!link_src_to_element(pad, stream->decodebin))
979 GST_ERROR("Failed to link pad %p to stream decodebin %p for stream %u.",
980 pad, stream->decodebin, stream->number);
982 return;
985 if (!stream_create_post_processing_elements(pad, stream))
986 return;
987 gst_pad_set_active(stream->my_sink, 1);
990 static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user)
992 struct wg_parser *parser = user;
993 bool done = false;
994 unsigned int i;
995 char *name;
997 GST_LOG("parser %p, element %p, pad %p.", parser, element, pad);
999 for (i = 0; i < parser->stream_count; ++i)
1001 struct wg_parser_stream *stream = parser->streams[i];
1002 GstPad *stream_decodebin_sink_peer = NULL;
1003 GstPad *stream_decodebin_sink = NULL;
1005 if (stream->decodebin)
1007 stream_decodebin_sink = gst_element_get_static_pad(stream->decodebin, "sink");
1008 stream_decodebin_sink_peer = gst_pad_get_peer(stream_decodebin_sink);
1011 if (stream_decodebin_sink_peer == pad)
1013 gst_pad_unlink(pad, stream_decodebin_sink);
1014 done = true;
1017 if (stream_decodebin_sink_peer)
1018 gst_object_unref(stream_decodebin_sink_peer);
1019 if (stream_decodebin_sink)
1020 gst_object_unref(stream_decodebin_sink);
1022 if (done)
1023 return;
1026 name = gst_pad_get_name(pad);
1027 GST_WARNING("No pin matching pad \"%s\" found.", name);
1028 g_free(name);
1031 static GstFlowReturn issue_read_request(struct wg_parser *parser, guint64 offset, guint size, GstBuffer **buffer)
1033 GstFlowReturn ret;
1035 pthread_mutex_lock(&parser->mutex);
1037 assert(!parser->read_request.size);
1038 parser->read_request.buffer = *buffer;
1039 parser->read_request.offset = offset;
1040 parser->read_request.size = size;
1041 parser->read_request.done = false;
1042 pthread_cond_signal(&parser->read_cond);
1044 /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
1045 * the upstream pin to flush if necessary. We should never be blocked on
1046 * read_thread() not running. */
1048 while (!parser->read_request.done)
1049 pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
1051 *buffer = parser->read_request.buffer;
1052 ret = parser->read_request.ret;
1054 pthread_mutex_unlock(&parser->mutex);
1056 GST_LOG("Request returned %s.", gst_flow_get_name(ret));
1058 return ret;
1061 static struct input_cache_chunk * get_cache_entry(struct wg_parser *parser, guint64 position)
1063 struct input_cache_chunk chunk;
1064 unsigned int i;
1066 for (i = 0; i < ARRAY_SIZE(parser->input_cache_chunks); i++)
1068 chunk = parser->input_cache_chunks[i];
1070 if (chunk.data && position == chunk.position)
1072 if (i != 0)
1074 memmove(&parser->input_cache_chunks[1], &parser->input_cache_chunks[0], i * sizeof(chunk));
1075 parser->input_cache_chunks[0] = chunk;
1078 return &parser->input_cache_chunks[0];
1082 return NULL;
1085 static GstFlowReturn read_cached_chunk(struct wg_parser *parser, guint64 chunk_position, unsigned int chunk_offset, GstBuffer *buffer, guint64 buffer_offset)
1087 struct input_cache_chunk *chunk;
1088 GstBuffer *chunk_buffer;
1089 void *chunk_data;
1090 GstFlowReturn ret;
1092 if ((chunk = get_cache_entry(parser, chunk_position)))
1094 if (!!gst_buffer_fill(buffer, buffer_offset, chunk->data + chunk_offset, input_cache_chunk_size - chunk_offset))
1095 return GST_FLOW_OK;
1096 else
1097 return GST_FLOW_ERROR;
1100 chunk = &parser->input_cache_chunks[ ARRAY_SIZE(parser->input_cache_chunks) - 1 ];
1102 if (!(chunk_data = chunk->data))
1103 chunk_data = malloc(input_cache_chunk_size);
1105 chunk_buffer = gst_buffer_new_wrapped_full(0, chunk_data, input_cache_chunk_size, 0, input_cache_chunk_size, NULL, NULL);
1106 ret = issue_read_request(parser, chunk_position, input_cache_chunk_size, &chunk_buffer);
1107 gst_buffer_unref(chunk_buffer);
1109 if (ret != GST_FLOW_OK)
1111 if (!chunk->data)
1112 free(chunk_data);
1113 return ret;
1116 memmove(&parser->input_cache_chunks[1], &parser->input_cache_chunks[0], (ARRAY_SIZE(parser->input_cache_chunks) - 1) * sizeof(*chunk));
1117 parser->input_cache_chunks[0].data = chunk_data;
1118 parser->input_cache_chunks[0].position = chunk_position;
1120 chunk = &parser->input_cache_chunks[0];
1121 if (!!gst_buffer_fill(buffer, buffer_offset, chunk->data + chunk_offset, input_cache_chunk_size - chunk_offset))
1122 return GST_FLOW_OK;
1123 else
1124 return GST_FLOW_ERROR;
1127 static GstFlowReturn read_input_cache(struct wg_parser *parser, guint64 offset, guint size, GstBuffer **buffer)
1129 unsigned int i, chunk_count, chunk_offset, buffer_offset = 0;
1130 GstBuffer *working_buffer;
1131 guint64 chunk_position;
1132 GstFlowReturn ret;
1134 working_buffer = *buffer;
1135 if (!working_buffer)
1136 working_buffer = gst_buffer_new_and_alloc(size);
1138 chunk_position = offset - (offset % input_cache_chunk_size);
1139 chunk_count = (offset + size + input_cache_chunk_size - chunk_position - 1) / input_cache_chunk_size;
1140 chunk_offset = offset - chunk_position;
1142 for (i = 0; i < chunk_count; i++)
1144 if ((ret = read_cached_chunk(parser, chunk_position, chunk_offset, working_buffer, buffer_offset)) != GST_FLOW_OK)
1146 if (!*buffer)
1147 gst_buffer_unref(working_buffer);
1148 return ret;
1151 chunk_position += input_cache_chunk_size;
1152 buffer_offset += input_cache_chunk_size - chunk_offset;
1153 chunk_offset = 0;
1156 *buffer = working_buffer;
1157 return GST_FLOW_OK;
1160 static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
1161 guint64 offset, guint size, GstBuffer **buffer)
1163 struct wg_parser *parser = gst_pad_get_element_private(pad);
1165 GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, size %u, buffer %p.", pad, offset, size, *buffer);
1167 if (offset == GST_BUFFER_OFFSET_NONE)
1168 offset = parser->next_pull_offset;
1169 parser->next_pull_offset = offset + size;
1171 if (!size)
1173 /* asfreader occasionally asks for zero bytes. gst_buffer_map() will
1174 * return NULL in this case. Avoid confusing the read thread by asking
1175 * it for zero bytes. */
1176 if (!*buffer)
1177 *buffer = gst_buffer_new_and_alloc(0);
1178 gst_buffer_set_size(*buffer, 0);
1179 GST_LOG("Returning empty buffer.");
1180 return GST_FLOW_OK;
1183 if (size >= input_cache_chunk_size || sizeof(void*) == 4)
1184 return issue_read_request(parser, offset, size, buffer);
1186 if (offset >= parser->file_size)
1187 return GST_FLOW_EOS;
1189 if ((offset + size) >= parser->file_size)
1190 size = parser->file_size - offset;
1192 return read_input_cache(parser, offset, size, buffer);
1195 static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
1197 struct wg_parser *parser = gst_pad_get_element_private(pad);
1198 GstFormat format;
1200 GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query));
1202 switch (GST_QUERY_TYPE(query))
1204 case GST_QUERY_DURATION:
1205 gst_query_parse_duration(query, &format, NULL);
1206 if (format == GST_FORMAT_PERCENT)
1208 gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX);
1209 return TRUE;
1211 else if (format == GST_FORMAT_BYTES)
1213 gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size);
1214 return TRUE;
1216 return FALSE;
1218 case GST_QUERY_SEEKING:
1219 gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1220 if (format != GST_FORMAT_BYTES)
1222 GST_WARNING("Cannot seek using format \"%s\".", gst_format_get_name(format));
1223 return FALSE;
1225 gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size);
1226 return TRUE;
1228 case GST_QUERY_SCHEDULING:
1229 gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1230 gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
1231 gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL);
1232 return TRUE;
1234 default:
1235 GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query));
1236 return FALSE;
1240 static void *push_data(void *arg)
1242 struct wg_parser *parser = arg;
1243 GstBuffer *buffer;
1244 guint max_size;
1246 GST_DEBUG("Starting push thread.");
1248 if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL)))
1250 GST_ERROR("Failed to allocate memory.");
1251 return NULL;
1254 max_size = parser->stop_offset ? parser->stop_offset : parser->file_size;
1256 for (;;)
1258 ULONG size;
1259 int ret;
1261 if (parser->next_offset >= max_size)
1262 break;
1263 size = min(16384, max_size - parser->next_offset);
1265 if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0)
1267 GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret));
1268 break;
1271 parser->next_offset += size;
1273 buffer->duration = buffer->pts = -1;
1274 if ((ret = gst_pad_push(parser->my_src, buffer)) < 0)
1276 GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret));
1277 break;
1281 gst_buffer_unref(buffer);
1283 gst_pad_push_event(parser->my_src, gst_event_new_eos());
1285 GST_DEBUG("Stopping push thread.");
1287 return NULL;
1290 static gboolean activate_push(GstPad *pad, gboolean activate)
1292 struct wg_parser *parser = gst_pad_get_element_private(pad);
1294 if (!activate)
1296 if (parser->push_thread)
1298 pthread_join(parser->push_thread, NULL);
1299 parser->push_thread = 0;
1302 else if (!parser->push_thread)
1304 int ret;
1306 if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser)))
1308 GST_ERROR("Failed to create push thread: %s", strerror(errno));
1309 parser->push_thread = 0;
1310 return FALSE;
1313 return TRUE;
1316 static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate)
1318 struct wg_parser *parser = gst_pad_get_element_private(pad);
1320 GST_DEBUG("%s source pad for parser %p in %s mode.",
1321 activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode));
1323 switch (mode)
1325 case GST_PAD_MODE_PULL:
1326 return TRUE;
1327 case GST_PAD_MODE_PUSH:
1328 return activate_push(pad, activate);
1329 case GST_PAD_MODE_NONE:
1330 break;
1332 return FALSE;
1335 static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user)
1337 struct wg_parser *parser = user;
1338 gchar *dbg_info = NULL;
1339 GError *err = NULL;
1341 GST_DEBUG("parser %p, message type %s.", parser, GST_MESSAGE_TYPE_NAME(msg));
1343 switch (msg->type)
1345 case GST_MESSAGE_ERROR:
1346 gst_message_parse_error(msg, &err, &dbg_info);
1347 if (parser->err_on)
1349 fprintf(stderr, "winegstreamer error: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
1350 fprintf(stderr, "winegstreamer error: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info);
1352 g_error_free(err);
1353 g_free(dbg_info);
1354 pthread_mutex_lock(&parser->mutex);
1355 parser->error = true;
1356 pthread_mutex_unlock(&parser->mutex);
1357 pthread_cond_signal(&parser->init_cond);
1358 break;
1360 case GST_MESSAGE_WARNING:
1361 gst_message_parse_warning(msg, &err, &dbg_info);
1362 if (parser->warn_on)
1364 fprintf(stderr, "winegstreamer warning: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
1365 fprintf(stderr, "winegstreamer warning: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info);
1367 g_error_free(err);
1368 g_free(dbg_info);
1369 break;
1371 case GST_MESSAGE_DURATION_CHANGED:
1372 pthread_mutex_lock(&parser->mutex);
1373 parser->has_duration = true;
1374 pthread_mutex_unlock(&parser->mutex);
1375 pthread_cond_signal(&parser->init_cond);
1376 break;
1378 default:
1379 break;
1381 gst_message_unref(msg);
1382 return GST_BUS_DROP;
1385 static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event)
1387 BOOL thread = !!parser->push_thread;
1388 GstSeekType cur_type, stop_type;
1389 GstFormat seek_format;
1390 GstEvent *flush_event;
1391 GstSeekFlags flags;
1392 gint64 cur, stop;
1393 guint32 seqnum;
1394 gdouble rate;
1396 gst_event_parse_seek(event, &rate, &seek_format, &flags,
1397 &cur_type, &cur, &stop_type, &stop);
1399 if (seek_format != GST_FORMAT_BYTES)
1401 GST_FIXME("Unhandled format \"%s\".", gst_format_get_name(seek_format));
1402 return FALSE;
1405 seqnum = gst_event_get_seqnum(event);
1407 /* send flush start */
1408 if (flags & GST_SEEK_FLAG_FLUSH)
1410 flush_event = gst_event_new_flush_start();
1411 gst_event_set_seqnum(flush_event, seqnum);
1412 gst_pad_push_event(parser->my_src, flush_event);
1413 if (thread)
1414 gst_pad_set_active(parser->my_src, 1);
1417 parser->next_offset = parser->start_offset = cur;
1419 /* and prepare to continue streaming */
1420 if (flags & GST_SEEK_FLAG_FLUSH)
1422 flush_event = gst_event_new_flush_stop(TRUE);
1423 gst_event_set_seqnum(flush_event, seqnum);
1424 gst_pad_push_event(parser->my_src, flush_event);
1425 if (thread)
1426 gst_pad_set_active(parser->my_src, 1);
1429 return TRUE;
1432 static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
1434 struct wg_parser *parser = gst_pad_get_element_private(pad);
1435 gboolean ret = TRUE;
1437 GST_LOG("parser %p, type \"%s\".", parser, GST_EVENT_TYPE_NAME(event));
1439 switch (event->type)
1441 case GST_EVENT_SEEK:
1442 ret = src_perform_seek(parser, event);
1443 break;
1445 case GST_EVENT_FLUSH_START:
1446 case GST_EVENT_FLUSH_STOP:
1447 case GST_EVENT_QOS:
1448 case GST_EVENT_RECONFIGURE:
1449 break;
1451 default:
1452 GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
1453 ret = FALSE;
1454 break;
1456 gst_event_unref(event);
1457 return ret;
1460 static void query_tags(struct wg_parser_stream *stream)
1462 GstPad *peer = gst_pad_get_peer(stream->my_sink);
1463 const gchar *struct_name;
1464 GstEvent *tag_event;
1465 guint i, j;
1467 stream->tags[WG_PARSER_TAG_NAME] = NULL;
1468 stream->tags[WG_PARSER_TAG_LANGUAGE] = NULL;
1470 i = 0;
1471 while ((tag_event = gst_pad_get_sticky_event(peer, GST_EVENT_TAG, i++)))
1473 GstTagList *tag_list;
1475 gst_event_parse_tag(tag_event, &tag_list);
1477 if (!stream->tags[WG_PARSER_TAG_NAME])
1479 /* Extract stream name from Quick Time demuxer private tag where it puts unrecognized chunks. */
1480 const GValue *val;
1481 GstSample *sample;
1482 GstBuffer *buf;
1483 gsize size;
1484 guint tag_count = gst_tag_list_get_tag_size(tag_list, "private-qt-tag");
1486 for (j = 0; j < tag_count; ++j)
1488 if (!(val = gst_tag_list_get_value_index(tag_list, "private-qt-tag", j)))
1489 continue;
1490 if (!GST_VALUE_HOLDS_SAMPLE(val) || !(sample = gst_value_get_sample(val)))
1491 continue;
1492 struct_name = gst_structure_get_name(gst_sample_get_info(sample));
1493 if (!struct_name || strcmp(struct_name, "application/x-gst-qt-name-tag"))
1494 continue;
1495 if (!(buf = gst_sample_get_buffer(sample)))
1496 continue;
1497 if ((size = gst_buffer_get_size(buf)) < 8)
1498 continue;
1499 size -= 8;
1500 if (!(stream->tags[WG_PARSER_TAG_NAME] = g_malloc(size + 1)))
1501 continue;
1502 if (gst_buffer_extract(buf, 8, stream->tags[WG_PARSER_TAG_NAME], size) != size)
1504 g_free(stream->tags[WG_PARSER_TAG_NAME]);
1505 stream->tags[WG_PARSER_TAG_NAME] = NULL;
1506 continue;
1508 stream->tags[WG_PARSER_TAG_NAME][size] = 0;
1512 if (!stream->tags[WG_PARSER_TAG_LANGUAGE])
1514 gchar *lang_code = NULL;
1516 gst_tag_list_get_string(tag_list, GST_TAG_LANGUAGE_CODE, &lang_code);
1517 if (stream->parser->sink_caps && !strcmp(stream->parser->sink_caps, "video/quicktime"))
1519 /* For QuickTime media, we convert the language tags to ISO 639-1. */
1520 const gchar *lang_code_iso_639_1 = lang_code ? gst_tag_get_language_code_iso_639_1(lang_code) : NULL;
1521 stream->tags[WG_PARSER_TAG_LANGUAGE] = lang_code_iso_639_1 ? g_strdup(lang_code_iso_639_1) : NULL;
1522 g_free(lang_code);
1524 else
1525 stream->tags[WG_PARSER_TAG_LANGUAGE] = lang_code;
1528 gst_event_unref(tag_event);
1530 gst_object_unref(peer);
1533 static NTSTATUS wg_parser_connect(void *args)
1535 GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src",
1536 GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
1537 const struct wg_parser_connect_params *params = args;
1538 struct wg_parser *parser = params->parser;
1539 unsigned int i;
1540 int ret;
1542 parser->file_size = params->file_size;
1543 parser->sink_connected = true;
1545 if (!parser->bus)
1547 parser->bus = gst_bus_new();
1548 gst_bus_set_sync_handler(parser->bus, bus_handler_cb, parser, NULL);
1551 parser->container = gst_bin_new(NULL);
1552 gst_element_set_bus(parser->container, parser->bus);
1554 parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src");
1555 gst_pad_set_getrange_function(parser->my_src, src_getrange_cb);
1556 gst_pad_set_query_function(parser->my_src, src_query_cb);
1557 gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb);
1558 gst_pad_set_event_function(parser->my_src, src_event_cb);
1559 gst_pad_set_element_private(parser->my_src, parser);
1561 parser->start_offset = parser->next_offset = parser->stop_offset = 0;
1562 parser->next_pull_offset = 0;
1563 parser->error = false;
1565 if (!parser->init_gst(parser))
1566 goto out;
1568 gst_element_set_state(parser->container, GST_STATE_PAUSED);
1569 ret = gst_element_get_state(parser->container, NULL, NULL, -1);
1570 if (ret == GST_STATE_CHANGE_FAILURE)
1572 GST_ERROR("Failed to play stream.\n");
1573 goto out;
1576 pthread_mutex_lock(&parser->mutex);
1578 while (!parser_no_more_pads(parser) && !parser->error)
1579 pthread_cond_wait(&parser->init_cond, &parser->mutex);
1580 if (parser->error)
1582 pthread_mutex_unlock(&parser->mutex);
1583 goto out;
1586 for (i = 0; i < parser->stream_count; ++i)
1588 struct wg_parser_stream *stream = parser->streams[i];
1589 gint64 duration;
1591 /* If we received a buffer, waiting for tags or caps does not make sense anymore. */
1592 while ((!stream->has_caps || !stream->has_tags) && !parser->error && !stream->has_buffer)
1593 pthread_cond_wait(&parser->init_cond, &parser->mutex);
1595 /* GStreamer doesn't actually provide any guarantees about when duration
1596 * is available, even for seekable streams. It's basically built for
1597 * applications that don't care, e.g. movie players that can display
1598 * a duration once it's available, and update it visually if a better
1599 * estimate is found. This doesn't really match well with DirectShow or
1600 * Media Foundation, which both expect duration to be available
1601 * immediately on connecting, so we have to use some complex heuristics
1602 * to try to actually get a usable duration.
1604 * Some elements (avidemux, wavparse, qtdemux) record duration almost
1605 * immediately, before fixing caps. Such elements don't send
1606 * duration-changed messages. Therefore always try querying duration
1607 * after caps have been found.
1609 * Some elements (mpegaudioparse) send duration-changed. In the case of
1610 * a mp3 stream without seek tables it will not be sent immediately, but
1611 * only after enough frames have been parsed to form an estimate. They
1612 * may send it multiple times with increasingly accurate estimates, but
1613 * unfortunately we have no way of knowing whether another estimate will
1614 * be sent, so we always take the first one. We assume that if the
1615 * duration is not immediately available then the element will always
1616 * send duration-changed.
1619 for (;;)
1621 if (parser->error)
1623 pthread_mutex_unlock(&parser->mutex);
1624 goto out;
1626 if (gst_pad_peer_query_duration(stream->my_sink, GST_FORMAT_TIME, &duration))
1628 stream->duration = duration / 100;
1629 break;
1632 if (stream->eos)
1634 stream->duration = 0;
1635 GST_WARNING("Failed to query duration.\n");
1636 break;
1639 /* Elements based on GstBaseParse send duration-changed before
1640 * actually updating the duration in GStreamer versions prior
1641 * to 1.17.1. See <gstreamer.git:d28e0b4147fe7073b2>. So after
1642 * receiving duration-changed we have to continue polling until
1643 * the query succeeds. */
1644 if (parser->has_duration)
1646 pthread_mutex_unlock(&parser->mutex);
1647 g_usleep(10000);
1648 pthread_mutex_lock(&parser->mutex);
1650 else
1652 pthread_cond_wait(&parser->init_cond, &parser->mutex);
1656 query_tags(stream);
1658 /* Now that we're fully initialized, enable the stream so that further
1659 * samples get queued instead of being discarded. We don't actually need
1660 * the samples (in particular, the frontend should seek before
1661 * attempting to read anything), but we don't want to waste CPU time
1662 * trying to decode them. */
1663 stream->enabled = true;
1666 pthread_mutex_unlock(&parser->mutex);
1668 parser->next_offset = 0;
1669 return S_OK;
1671 out:
1672 if (parser->container)
1673 gst_element_set_state(parser->container, GST_STATE_NULL);
1674 if (parser->my_src)
1675 gst_object_unref(parser->my_src);
1677 for (i = 0; i < parser->stream_count; ++i)
1678 free_stream(parser->streams[i]);
1679 parser->stream_count = 0;
1680 free(parser->streams);
1681 parser->streams = NULL;
1683 if (parser->container)
1685 gst_element_set_bus(parser->container, NULL);
1686 gst_object_unref(parser->container);
1687 parser->container = NULL;
1690 g_free(parser->sink_caps);
1691 parser->sink_caps = NULL;
1693 pthread_mutex_lock(&parser->mutex);
1694 parser->sink_connected = false;
1695 pthread_mutex_unlock(&parser->mutex);
1696 pthread_cond_signal(&parser->read_cond);
1698 return E_FAIL;
1701 static NTSTATUS wg_parser_disconnect(void *args)
1703 struct wg_parser *parser = args;
1704 unsigned int i;
1706 /* Unblock all of our streams. */
1707 pthread_mutex_lock(&parser->mutex);
1708 for (i = 0; i < parser->stream_count; ++i)
1710 parser->streams[i]->flushing = true;
1711 pthread_cond_signal(&parser->streams[i]->event_empty_cond);
1713 pthread_mutex_unlock(&parser->mutex);
1715 gst_element_set_state(parser->container, GST_STATE_NULL);
1716 gst_object_unref(parser->my_src);
1717 parser->my_src = NULL;
1719 pthread_mutex_lock(&parser->mutex);
1720 parser->sink_connected = false;
1721 pthread_mutex_unlock(&parser->mutex);
1722 pthread_cond_signal(&parser->read_cond);
1724 for (i = 0; i < parser->stream_count; ++i)
1725 free_stream(parser->streams[i]);
1727 parser->stream_count = 0;
1728 free(parser->streams);
1729 parser->streams = NULL;
1731 gst_element_set_bus(parser->container, NULL);
1732 gst_object_unref(parser->container);
1733 parser->container = NULL;
1735 g_free(parser->sink_caps);
1736 parser->sink_caps = NULL;
1738 for (i = 0; i < ARRAY_SIZE(parser->input_cache_chunks); i++)
1740 if (parser->input_cache_chunks[i].data)
1742 free(parser->input_cache_chunks[i].data);
1743 parser->input_cache_chunks[i].data = NULL;
1747 return S_OK;
1750 static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
1752 GstElement *element;
1754 if (!(element = create_element("decodebin", "base")))
1755 return FALSE;
1757 gst_bin_add(GST_BIN(parser->container), element);
1758 parser->decodebin = element;
1760 if (parser->unlimited_buffering)
1762 g_object_set(parser->decodebin, "max-size-buffers", G_MAXUINT, NULL);
1763 g_object_set(parser->decodebin, "max-size-time", G_MAXUINT64, NULL);
1764 g_object_set(parser->decodebin, "max-size-bytes", G_MAXUINT, NULL);
1767 g_signal_connect(element, "pad-added", G_CALLBACK(pad_added_cb), parser);
1768 g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser);
1769 g_signal_connect(element, "autoplug-continue", G_CALLBACK(autoplug_continue_cb), parser);
1770 g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser);
1771 g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
1773 pthread_mutex_lock(&parser->mutex);
1774 parser->no_more_pads = false;
1775 pthread_mutex_unlock(&parser->mutex);
1777 if (!link_src_to_element(parser->my_src, element))
1778 return FALSE;
1780 return TRUE;
1783 static BOOL avi_parser_init_gst(struct wg_parser *parser)
1785 GstElement *element;
1787 if (!(element = create_element("avidemux", "good")))
1788 return FALSE;
1790 gst_bin_add(GST_BIN(parser->container), element);
1792 g_signal_connect(element, "pad-added", G_CALLBACK(pad_added_cb), parser);
1793 g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser);
1794 g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
1796 pthread_mutex_lock(&parser->mutex);
1797 parser->no_more_pads = false;
1798 pthread_mutex_unlock(&parser->mutex);
1800 if (!link_src_to_element(parser->my_src, element))
1801 return FALSE;
1803 return TRUE;
1806 static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser)
1808 struct wg_parser_stream *stream;
1809 GstElement *element;
1811 if (!(element = create_element("mpegaudioparse", "good")))
1812 return FALSE;
1814 gst_bin_add(GST_BIN(parser->container), element);
1816 if (!link_src_to_element(parser->my_src, element))
1817 return FALSE;
1819 if (!(stream = create_stream(parser)))
1820 return FALSE;
1822 if (!link_element_to_sink(element, stream->my_sink))
1823 return FALSE;
1824 gst_pad_set_active(stream->my_sink, 1);
1826 parser->no_more_pads = true;
1828 return TRUE;
1831 static BOOL wave_parser_init_gst(struct wg_parser *parser)
1833 struct wg_parser_stream *stream;
1834 GstElement *element;
1836 if (!(element = create_element("wavparse", "good")))
1837 return FALSE;
1839 gst_bin_add(GST_BIN(parser->container), element);
1841 if (!link_src_to_element(parser->my_src, element))
1842 return FALSE;
1844 if (!(stream = create_stream(parser)))
1845 return FALSE;
1847 if (!link_element_to_sink(element, stream->my_sink))
1848 return FALSE;
1849 gst_pad_set_active(stream->my_sink, 1);
1851 parser->no_more_pads = true;
1853 return TRUE;
1856 static NTSTATUS wg_parser_create(void *args)
1858 static const init_gst_cb init_funcs[] =
1860 [WG_PARSER_DECODEBIN] = decodebin_parser_init_gst,
1861 [WG_PARSER_AVIDEMUX] = avi_parser_init_gst,
1862 [WG_PARSER_MPEGAUDIOPARSE] = mpeg_audio_parser_init_gst,
1863 [WG_PARSER_WAVPARSE] = wave_parser_init_gst,
1866 struct wg_parser_create_params *params = args;
1867 struct wg_parser *parser;
1869 if (!(parser = calloc(1, sizeof(*parser))))
1870 return E_OUTOFMEMORY;
1872 pthread_mutex_init(&parser->mutex, NULL);
1873 pthread_cond_init(&parser->init_cond, NULL);
1874 pthread_cond_init(&parser->read_cond, NULL);
1875 pthread_cond_init(&parser->read_done_cond, NULL);
1876 parser->init_gst = init_funcs[params->type];
1877 parser->unlimited_buffering = params->unlimited_buffering;
1878 parser->err_on = params->err_on;
1879 parser->warn_on = params->warn_on;
1880 GST_DEBUG("Created winegstreamer parser %p.", parser);
1881 params->parser = parser;
1882 return S_OK;
1885 static NTSTATUS wg_parser_destroy(void *args)
1887 struct wg_parser *parser = args;
1889 if (parser->bus)
1891 gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL);
1892 gst_object_unref(parser->bus);
1895 pthread_mutex_destroy(&parser->mutex);
1896 pthread_cond_destroy(&parser->init_cond);
1897 pthread_cond_destroy(&parser->read_cond);
1898 pthread_cond_destroy(&parser->read_done_cond);
1900 free(parser);
1901 return S_OK;
1904 const unixlib_entry_t __wine_unix_call_funcs[] =
1906 #define X(name) [unix_ ## name] = name
1907 X(wg_init_gstreamer),
1909 X(wg_parser_create),
1910 X(wg_parser_destroy),
1912 X(wg_parser_connect),
1913 X(wg_parser_disconnect),
1915 X(wg_parser_get_next_read_offset),
1916 X(wg_parser_push_data),
1918 X(wg_parser_get_stream_count),
1919 X(wg_parser_get_stream),
1921 X(wg_parser_stream_get_preferred_format),
1922 X(wg_parser_stream_get_codec_format),
1923 X(wg_parser_stream_enable),
1924 X(wg_parser_stream_disable),
1926 X(wg_parser_stream_get_buffer),
1927 X(wg_parser_stream_copy_buffer),
1928 X(wg_parser_stream_release_buffer),
1929 X(wg_parser_stream_notify_qos),
1931 X(wg_parser_stream_get_duration),
1932 X(wg_parser_stream_get_tag),
1933 X(wg_parser_stream_seek),
1935 X(wg_transform_create),
1936 X(wg_transform_destroy),
1937 X(wg_transform_set_output_format),
1939 X(wg_transform_push_data),
1940 X(wg_transform_read_data),
1941 X(wg_transform_get_status),