winegstreamer: Move some wg_parser / wg_transform helpers to unixlib.c.
[wine.git] / dlls / winegstreamer / wg_parser.c
blob49db4f3118e3b56b0b79c74ccd4a77f67a611c74
1 /*
2 * GStreamer parser backend
4 * Copyright 2010 Maarten Lankhorst for CodeWeavers
5 * Copyright 2010 Aric Stewart for CodeWeavers
6 * Copyright 2019-2020 Zebediah Figura
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
23 #if 0
24 #pragma makedep unix
25 #endif
27 #include "config.h"
29 #include <assert.h>
30 #include <stdarg.h>
31 #include <stdio.h>
33 #include <gst/gst.h>
34 #include <gst/video/video.h>
35 #include <gst/audio/audio.h>
36 #include <gst/tag/tag.h>
38 #include "ntstatus.h"
39 #define WIN32_NO_STATUS
40 #include "winternl.h"
41 #include "dshow.h"
43 #include "unix_private.h"
45 typedef enum
47 GST_AUTOPLUG_SELECT_TRY,
48 GST_AUTOPLUG_SELECT_EXPOSE,
49 GST_AUTOPLUG_SELECT_SKIP,
50 } GstAutoplugSelectResult;
52 typedef BOOL (*init_gst_cb)(struct wg_parser *parser);
54 struct wg_parser
56 init_gst_cb init_gst;
58 struct wg_parser_stream **streams;
59 unsigned int stream_count;
61 GstElement *container, *decodebin;
62 GstBus *bus;
63 GstPad *my_src, *their_sink;
65 guint64 file_size, start_offset, next_offset, stop_offset;
66 guint64 next_pull_offset;
68 pthread_t push_thread;
70 pthread_mutex_t mutex;
72 pthread_cond_t init_cond;
73 bool no_more_pads, has_duration, error;
74 bool err_on, warn_on;
76 pthread_cond_t read_cond, read_done_cond;
77 struct
79 GstBuffer *buffer;
80 uint64_t offset;
81 uint32_t size;
82 bool done;
83 GstFlowReturn ret;
84 } read_request;
86 bool sink_connected;
88 bool unlimited_buffering;
90 gchar *sink_caps;
93 struct wg_parser_stream
95 struct wg_parser *parser;
96 uint32_t number;
98 GstPad *their_src, *post_sink, *post_src, *my_sink;
99 GstElement *flip;
100 GstSegment segment;
101 struct wg_format preferred_format, current_format;
103 pthread_cond_t event_cond, event_empty_cond;
104 GstBuffer *buffer;
105 GstMapInfo map_info;
107 bool flushing, eos, enabled, has_caps, has_tags, has_buffer;
109 uint64_t duration;
110 gchar *tags[WG_PARSER_TAG_COUNT];
113 static NTSTATUS wg_parser_get_stream_count(void *args)
115 struct wg_parser_get_stream_count_params *params = args;
117 params->count = params->parser->stream_count;
118 return S_OK;
121 static NTSTATUS wg_parser_get_stream(void *args)
123 struct wg_parser_get_stream_params *params = args;
125 params->stream = params->parser->streams[params->index];
126 return S_OK;
129 static NTSTATUS wg_parser_get_next_read_offset(void *args)
131 struct wg_parser_get_next_read_offset_params *params = args;
132 struct wg_parser *parser = params->parser;
134 pthread_mutex_lock(&parser->mutex);
136 while (parser->sink_connected && !parser->read_request.size)
137 pthread_cond_wait(&parser->read_cond, &parser->mutex);
139 if (!parser->sink_connected)
141 pthread_mutex_unlock(&parser->mutex);
142 return VFW_E_WRONG_STATE;
145 params->offset = parser->read_request.offset;
146 params->size = parser->read_request.size;
148 pthread_mutex_unlock(&parser->mutex);
149 return S_OK;
152 static NTSTATUS wg_parser_push_data(void *args)
154 const struct wg_parser_push_data_params *params = args;
155 struct wg_parser *parser = params->parser;
156 const void *data = params->data;
157 uint32_t size = params->size;
159 pthread_mutex_lock(&parser->mutex);
161 if (data)
163 if (size)
165 GstMapInfo map_info;
167 /* Note that we don't allocate the buffer until we have a size.
168 * midiparse passes a NULL buffer and a size of UINT_MAX, in an
169 * apparent attempt to read the whole input stream at once. */
170 if (!parser->read_request.buffer)
171 parser->read_request.buffer = gst_buffer_new_and_alloc(size);
172 gst_buffer_map(parser->read_request.buffer, &map_info, GST_MAP_WRITE);
173 memcpy(map_info.data, data, size);
174 gst_buffer_unmap(parser->read_request.buffer, &map_info);
175 parser->read_request.ret = GST_FLOW_OK;
177 else
179 parser->read_request.ret = GST_FLOW_EOS;
182 else
184 parser->read_request.ret = GST_FLOW_ERROR;
186 parser->read_request.done = true;
187 parser->read_request.size = 0;
189 pthread_mutex_unlock(&parser->mutex);
190 pthread_cond_signal(&parser->read_done_cond);
192 return S_OK;
195 static NTSTATUS wg_parser_stream_get_preferred_format(void *args)
197 const struct wg_parser_stream_get_preferred_format_params *params = args;
199 *params->format = params->stream->preferred_format;
200 return S_OK;
203 static NTSTATUS wg_parser_stream_enable(void *args)
205 const struct wg_parser_stream_enable_params *params = args;
206 struct wg_parser_stream *stream = params->stream;
207 const struct wg_format *format = params->format;
208 struct wg_parser *parser = stream->parser;
210 pthread_mutex_lock(&parser->mutex);
212 stream->current_format = *format;
213 stream->enabled = true;
215 pthread_mutex_unlock(&parser->mutex);
217 if (format->major_type == WG_MAJOR_TYPE_VIDEO)
219 bool flip = (format->u.video.height < 0);
221 gst_util_set_object_arg(G_OBJECT(stream->flip), "method", flip ? "vertical-flip" : "none");
224 gst_pad_push_event(stream->my_sink, gst_event_new_reconfigure());
225 return S_OK;
228 static NTSTATUS wg_parser_stream_disable(void *args)
230 struct wg_parser_stream *stream = args;
231 struct wg_parser *parser = stream->parser;
233 pthread_mutex_lock(&parser->mutex);
234 stream->enabled = false;
235 stream->current_format.major_type = WG_MAJOR_TYPE_UNKNOWN;
236 pthread_mutex_unlock(&parser->mutex);
237 pthread_cond_signal(&stream->event_empty_cond);
238 return S_OK;
241 static GstBuffer *wait_parser_stream_buffer(struct wg_parser *parser, struct wg_parser_stream *stream)
243 GstBuffer *buffer = NULL;
245 /* Note that we can both have a buffer and stream->eos, in which case we
246 * must return the buffer. */
248 while (stream->enabled && !(buffer = stream->buffer) && !stream->eos)
249 pthread_cond_wait(&stream->event_cond, &parser->mutex);
251 return buffer;
254 static NTSTATUS wg_parser_stream_get_buffer(void *args)
256 const struct wg_parser_stream_get_buffer_params *params = args;
257 struct wg_parser_buffer *wg_buffer = params->buffer;
258 struct wg_parser_stream *stream = params->stream;
259 struct wg_parser *parser = params->parser;
260 GstBuffer *buffer;
261 unsigned int i;
263 pthread_mutex_lock(&parser->mutex);
265 if (stream)
266 buffer = wait_parser_stream_buffer(parser, stream);
267 else
269 /* Find the earliest buffer by PTS.
271 * Native seems to behave similarly to this with the wm async reader, although our
272 * unit tests show that it's not entirely consistent—some frames are received
273 * slightly out of order. It's possible that one stream is being manually offset
274 * to account for decoding latency.
276 * The behaviour with the wm sync reader, when stream 0 is requested, seems
277 * consistent with this hypothesis, but with a much larger offset—the video
278 * stream seems to be "behind" by about 150 ms.
280 * The main reason for doing this is that the video and audio stream probably
281 * don't have quite the same "frame rate", and we don't want to force one stream
282 * to decode faster just to keep up with the other. Delivering samples in PTS
283 * order should avoid that problem. */
284 GstBuffer *earliest = NULL;
286 for (i = 0; i < parser->stream_count; ++i)
288 if (!(buffer = wait_parser_stream_buffer(parser, parser->streams[i])))
289 continue;
290 /* invalid PTS is GST_CLOCK_TIME_NONE == (guint64)-1, so this will prefer valid timestamps. */
291 if (!earliest || GST_BUFFER_PTS(buffer) < GST_BUFFER_PTS(earliest))
293 stream = parser->streams[i];
294 earliest = buffer;
298 buffer = earliest;
301 if (!buffer)
303 pthread_mutex_unlock(&parser->mutex);
304 return S_FALSE;
307 /* FIXME: Should we use gst_segment_to_stream_time_full()? Under what
308 * circumstances is the stream time not equal to the buffer PTS? Note
309 * that this will need modification to wg_parser_stream_notify_qos() as
310 * well. */
312 if ((wg_buffer->has_pts = GST_BUFFER_PTS_IS_VALID(buffer)))
313 wg_buffer->pts = GST_BUFFER_PTS(buffer) / 100;
314 if ((wg_buffer->has_duration = GST_BUFFER_DURATION_IS_VALID(buffer)))
315 wg_buffer->duration = GST_BUFFER_DURATION(buffer) / 100;
316 wg_buffer->discontinuity = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DISCONT);
317 wg_buffer->preroll = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_LIVE);
318 wg_buffer->delta = GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
319 wg_buffer->size = gst_buffer_get_size(buffer);
320 wg_buffer->stream = stream->number;
322 pthread_mutex_unlock(&parser->mutex);
323 return S_OK;
326 static NTSTATUS wg_parser_stream_copy_buffer(void *args)
328 const struct wg_parser_stream_copy_buffer_params *params = args;
329 struct wg_parser_stream *stream = params->stream;
330 struct wg_parser *parser = stream->parser;
331 uint32_t offset = params->offset;
332 uint32_t size = params->size;
334 pthread_mutex_lock(&parser->mutex);
336 if (!stream->buffer)
338 pthread_mutex_unlock(&parser->mutex);
339 return VFW_E_WRONG_STATE;
342 assert(offset < stream->map_info.size);
343 assert(offset + size <= stream->map_info.size);
344 memcpy(params->data, stream->map_info.data + offset, size);
346 pthread_mutex_unlock(&parser->mutex);
347 return S_OK;
350 static NTSTATUS wg_parser_stream_release_buffer(void *args)
352 struct wg_parser_stream *stream = args;
353 struct wg_parser *parser = stream->parser;
355 pthread_mutex_lock(&parser->mutex);
357 assert(stream->buffer);
359 gst_buffer_unmap(stream->buffer, &stream->map_info);
360 gst_buffer_unref(stream->buffer);
361 stream->buffer = NULL;
363 pthread_mutex_unlock(&parser->mutex);
364 pthread_cond_signal(&stream->event_empty_cond);
366 return S_OK;
369 static NTSTATUS wg_parser_stream_get_duration(void *args)
371 struct wg_parser_stream_get_duration_params *params = args;
373 params->duration = params->stream->duration;
374 return S_OK;
377 static NTSTATUS wg_parser_stream_get_tag(void *args)
379 struct wg_parser_stream_get_tag_params *params = args;
380 uint32_t len;
382 if (params->tag >= WG_PARSER_TAG_COUNT)
383 return STATUS_INVALID_PARAMETER;
384 if (!params->stream->tags[params->tag])
385 return STATUS_NOT_FOUND;
386 if ((len = strlen(params->stream->tags[params->tag]) + 1) > *params->size)
388 *params->size = len;
389 return STATUS_BUFFER_TOO_SMALL;
391 memcpy(params->buffer, params->stream->tags[params->tag], len);
392 return STATUS_SUCCESS;
395 static NTSTATUS wg_parser_stream_seek(void *args)
397 GstSeekType start_type = GST_SEEK_TYPE_SET, stop_type = GST_SEEK_TYPE_SET;
398 const struct wg_parser_stream_seek_params *params = args;
399 DWORD start_flags = params->start_flags;
400 DWORD stop_flags = params->stop_flags;
401 GstSeekFlags flags = 0;
403 if (start_flags & AM_SEEKING_SeekToKeyFrame)
404 flags |= GST_SEEK_FLAG_KEY_UNIT;
405 if (start_flags & AM_SEEKING_Segment)
406 flags |= GST_SEEK_FLAG_SEGMENT;
407 if (!(start_flags & AM_SEEKING_NoFlush))
408 flags |= GST_SEEK_FLAG_FLUSH;
410 if ((start_flags & AM_SEEKING_PositioningBitsMask) == AM_SEEKING_NoPositioning)
411 start_type = GST_SEEK_TYPE_NONE;
412 if ((stop_flags & AM_SEEKING_PositioningBitsMask) == AM_SEEKING_NoPositioning)
413 stop_type = GST_SEEK_TYPE_NONE;
415 if (!gst_pad_push_event(params->stream->my_sink, gst_event_new_seek(params->rate, GST_FORMAT_TIME,
416 flags, start_type, params->start_pos * 100, stop_type, params->stop_pos * 100)))
417 GST_ERROR("Failed to seek.\n");
419 return S_OK;
422 static NTSTATUS wg_parser_stream_notify_qos(void *args)
424 const struct wg_parser_stream_notify_qos_params *params = args;
425 struct wg_parser_stream *stream = params->stream;
426 GstClockTime stream_time;
427 GstEvent *event;
429 /* We return timestamps in stream time, i.e. relative to the start of the
430 * file (or other medium), but gst_event_new_qos() expects the timestamp in
431 * running time. */
432 stream_time = gst_segment_to_running_time(&stream->segment, GST_FORMAT_TIME, params->timestamp * 100);
433 if (stream_time == -1)
435 /* This can happen legitimately if the sample falls outside of the
436 * segment bounds. GStreamer elements shouldn't present the sample in
437 * that case, but DirectShow doesn't care. */
438 GST_LOG("Ignoring QoS event.\n");
439 return S_OK;
442 if (!(event = gst_event_new_qos(params->underflow ? GST_QOS_TYPE_UNDERFLOW : GST_QOS_TYPE_OVERFLOW,
443 params->proportion, params->diff * 100, stream_time)))
444 GST_ERROR("Failed to create QOS event.\n");
445 gst_pad_push_event(stream->my_sink, event);
447 return S_OK;
450 static GstAutoplugSelectResult autoplug_select_cb(GstElement *bin, GstPad *pad,
451 GstCaps *caps, GstElementFactory *fact, gpointer user)
453 struct wg_parser *parser = user;
454 const char *name = gst_element_factory_get_longname(fact);
455 const char *klass = gst_element_factory_get_klass(fact);
457 GST_INFO("Using \"%s\".", name);
459 if (strstr(name, "Player protection"))
461 GST_WARNING("Blacklisted a/52 decoder because it only works in Totem.");
462 return GST_AUTOPLUG_SELECT_SKIP;
464 if (!strcmp(name, "Fluendo Hardware Accelerated Video Decoder"))
466 GST_WARNING("Disabled video acceleration since it breaks in wine.");
467 return GST_AUTOPLUG_SELECT_SKIP;
470 if (!parser->sink_caps && strstr(klass, GST_ELEMENT_FACTORY_KLASS_DEMUXER))
471 parser->sink_caps = g_strdup(gst_structure_get_name(gst_caps_get_structure(caps, 0)));
473 return GST_AUTOPLUG_SELECT_TRY;
476 static void no_more_pads_cb(GstElement *element, gpointer user)
478 struct wg_parser *parser = user;
480 GST_DEBUG("parser %p.", parser);
482 pthread_mutex_lock(&parser->mutex);
483 parser->no_more_pads = true;
484 pthread_mutex_unlock(&parser->mutex);
485 pthread_cond_signal(&parser->init_cond);
488 static gboolean sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
490 struct wg_parser_stream *stream = gst_pad_get_element_private(pad);
491 struct wg_parser *parser = stream->parser;
493 GST_LOG("stream %p, type \"%s\".", stream, GST_EVENT_TYPE_NAME(event));
495 switch (event->type)
497 case GST_EVENT_SEGMENT:
498 pthread_mutex_lock(&parser->mutex);
499 if (stream->enabled)
501 const GstSegment *segment;
503 gst_event_parse_segment(event, &segment);
505 if (segment->format != GST_FORMAT_TIME)
507 pthread_mutex_unlock(&parser->mutex);
508 GST_FIXME("Unhandled format \"%s\".", gst_format_get_name(segment->format));
509 break;
512 gst_segment_copy_into(segment, &stream->segment);
514 pthread_mutex_unlock(&parser->mutex);
515 break;
517 case GST_EVENT_EOS:
518 pthread_mutex_lock(&parser->mutex);
519 stream->eos = true;
520 if (stream->enabled)
521 pthread_cond_signal(&stream->event_cond);
522 else
523 pthread_cond_signal(&parser->init_cond);
524 pthread_mutex_unlock(&parser->mutex);
525 break;
527 case GST_EVENT_FLUSH_START:
528 pthread_mutex_lock(&parser->mutex);
530 if (stream->enabled)
532 stream->flushing = true;
533 pthread_cond_signal(&stream->event_empty_cond);
535 if (stream->buffer)
537 gst_buffer_unmap(stream->buffer, &stream->map_info);
538 gst_buffer_unref(stream->buffer);
539 stream->buffer = NULL;
543 pthread_mutex_unlock(&parser->mutex);
544 break;
546 case GST_EVENT_FLUSH_STOP:
548 gboolean reset_time;
550 gst_event_parse_flush_stop(event, &reset_time);
552 if (reset_time)
553 gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
555 pthread_mutex_lock(&parser->mutex);
557 stream->eos = false;
558 if (stream->enabled)
559 stream->flushing = false;
561 pthread_mutex_unlock(&parser->mutex);
562 break;
565 case GST_EVENT_CAPS:
567 GstCaps *caps;
569 gst_event_parse_caps(event, &caps);
570 pthread_mutex_lock(&parser->mutex);
571 wg_format_from_caps(&stream->preferred_format, caps);
572 stream->has_caps = true;
573 pthread_mutex_unlock(&parser->mutex);
574 pthread_cond_signal(&parser->init_cond);
575 break;
578 case GST_EVENT_TAG:
579 pthread_mutex_lock(&parser->mutex);
580 stream->has_tags = true;
581 pthread_cond_signal(&parser->init_cond);
582 pthread_mutex_unlock(&parser->mutex);
583 break;
585 default:
586 GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
588 gst_event_unref(event);
589 return TRUE;
592 static GstFlowReturn sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer)
594 struct wg_parser_stream *stream = gst_pad_get_element_private(pad);
595 struct wg_parser *parser = stream->parser;
597 GST_LOG("stream %p, buffer %p.", stream, buffer);
599 pthread_mutex_lock(&parser->mutex);
601 if (!stream->has_buffer)
603 stream->has_buffer = true;
604 pthread_cond_signal(&parser->init_cond);
607 /* Allow this buffer to be flushed by GStreamer. We are effectively
608 * implementing a queue object here. */
610 while (stream->enabled && !stream->flushing && stream->buffer)
611 pthread_cond_wait(&stream->event_empty_cond, &parser->mutex);
613 if (!stream->enabled)
615 pthread_mutex_unlock(&parser->mutex);
616 gst_buffer_unref(buffer);
617 return GST_FLOW_OK;
620 if (stream->flushing)
622 pthread_mutex_unlock(&parser->mutex);
623 GST_DEBUG("Stream is flushing; discarding buffer.");
624 gst_buffer_unref(buffer);
625 return GST_FLOW_FLUSHING;
628 if (!gst_buffer_map(buffer, &stream->map_info, GST_MAP_READ))
630 pthread_mutex_unlock(&parser->mutex);
631 GST_ERROR("Failed to map buffer.\n");
632 gst_buffer_unref(buffer);
633 return GST_FLOW_ERROR;
636 stream->buffer = buffer;
638 pthread_mutex_unlock(&parser->mutex);
639 pthread_cond_signal(&stream->event_cond);
641 /* The chain callback is given a reference to the buffer. Transfer that
642 * reference to the stream object, which will release it in
643 * wg_parser_stream_release_buffer(). */
645 GST_LOG("Buffer queued.");
646 return GST_FLOW_OK;
649 static gboolean sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
651 struct wg_parser_stream *stream = gst_pad_get_element_private(pad);
652 struct wg_parser *parser = stream->parser;
654 GST_LOG("stream %p, type \"%s\".", stream, gst_query_type_get_name(query->type));
656 switch (query->type)
658 case GST_QUERY_CAPS:
660 GstCaps *caps, *filter, *temp;
661 gchar *str;
662 gsize i;
664 gst_query_parse_caps(query, &filter);
666 pthread_mutex_lock(&parser->mutex);
667 caps = wg_format_to_caps(&stream->current_format);
668 pthread_mutex_unlock(&parser->mutex);
670 if (!caps)
671 return FALSE;
673 /* Clear some fields that shouldn't prevent us from connecting. */
674 for (i = 0; i < gst_caps_get_size(caps); ++i)
675 gst_structure_remove_fields(gst_caps_get_structure(caps, i),
676 "framerate", "pixel-aspect-ratio", "colorimetry", "chroma-site", NULL);
678 str = gst_caps_to_string(caps);
679 GST_LOG("Stream caps are \"%s\".", str);
680 g_free(str);
682 if (filter)
684 temp = gst_caps_intersect(caps, filter);
685 gst_caps_unref(caps);
686 caps = temp;
689 gst_query_set_caps_result(query, caps);
690 gst_caps_unref(caps);
691 return TRUE;
694 case GST_QUERY_ACCEPT_CAPS:
696 struct wg_format format;
697 gboolean ret = TRUE;
698 GstCaps *caps;
700 pthread_mutex_lock(&parser->mutex);
702 if (stream->current_format.major_type == WG_MAJOR_TYPE_UNKNOWN)
704 pthread_mutex_unlock(&parser->mutex);
705 gst_query_set_accept_caps_result(query, TRUE);
706 return TRUE;
709 gst_query_parse_accept_caps(query, &caps);
710 wg_format_from_caps(&format, caps);
711 ret = wg_format_compare(&format, &stream->current_format);
713 pthread_mutex_unlock(&parser->mutex);
715 if (!ret && gst_debug_category_get_threshold(GST_CAT_DEFAULT) >= GST_LEVEL_WARNING)
717 gchar *str = gst_caps_to_string(caps);
718 GST_WARNING("Rejecting caps \"%s\".", str);
719 g_free(str);
721 gst_query_set_accept_caps_result(query, ret);
722 return TRUE;
725 default:
726 return gst_pad_query_default (pad, parent, query);
730 static struct wg_parser_stream *create_stream(struct wg_parser *parser)
732 struct wg_parser_stream *stream, **new_array;
733 char pad_name[19];
735 if (!(new_array = realloc(parser->streams, (parser->stream_count + 1) * sizeof(*parser->streams))))
736 return NULL;
737 parser->streams = new_array;
739 if (!(stream = calloc(1, sizeof(*stream))))
740 return NULL;
742 gst_segment_init(&stream->segment, GST_FORMAT_UNDEFINED);
744 stream->parser = parser;
745 stream->number = parser->stream_count;
746 stream->current_format.major_type = WG_MAJOR_TYPE_UNKNOWN;
747 pthread_cond_init(&stream->event_cond, NULL);
748 pthread_cond_init(&stream->event_empty_cond, NULL);
750 sprintf(pad_name, "qz_sink_%u", parser->stream_count);
751 stream->my_sink = gst_pad_new(pad_name, GST_PAD_SINK);
752 gst_pad_set_element_private(stream->my_sink, stream);
753 gst_pad_set_chain_function(stream->my_sink, sink_chain_cb);
754 gst_pad_set_event_function(stream->my_sink, sink_event_cb);
755 gst_pad_set_query_function(stream->my_sink, sink_query_cb);
757 parser->streams[parser->stream_count++] = stream;
758 return stream;
761 static void free_stream(struct wg_parser_stream *stream)
763 unsigned int i;
765 if (stream->their_src)
767 if (stream->post_sink)
769 gst_object_unref(stream->post_src);
770 gst_object_unref(stream->post_sink);
771 stream->post_src = stream->post_sink = NULL;
773 gst_object_unref(stream->their_src);
775 gst_object_unref(stream->my_sink);
777 pthread_cond_destroy(&stream->event_cond);
778 pthread_cond_destroy(&stream->event_empty_cond);
780 for (i = 0; i < ARRAY_SIZE(stream->tags); ++i)
782 if (stream->tags[i])
783 g_free(stream->tags[i]);
785 free(stream);
788 static void pad_added_cb(GstElement *element, GstPad *pad, gpointer user)
790 struct wg_parser *parser = user;
791 struct wg_parser_stream *stream;
792 const char *name;
793 GstCaps *caps;
794 int ret;
796 GST_LOG("parser %p, element %p, pad %p.", parser, element, pad);
798 if (gst_pad_is_linked(pad))
799 return;
801 caps = gst_pad_query_caps(pad, NULL);
802 name = gst_structure_get_name(gst_caps_get_structure(caps, 0));
804 if (!(stream = create_stream(parser)))
805 goto out;
807 if (!strcmp(name, "video/x-raw"))
809 GstElement *deinterlace, *vconv, *flip, *vconv2;
811 /* DirectShow can express interlaced video, but downstream filters can't
812 * necessarily consume it. In particular, the video renderer can't. */
813 if (!(deinterlace = create_element("deinterlace", "good")))
814 goto out;
816 /* decodebin considers many YUV formats to be "raw", but some quartz
817 * filters can't handle those. Also, videoflip can't handle all "raw"
818 * formats either. Add a videoconvert to swap color spaces. */
819 if (!(vconv = create_element("videoconvert", "base")))
820 goto out;
822 /* GStreamer outputs RGB video top-down, but DirectShow expects bottom-up. */
823 if (!(flip = create_element("videoflip", "good")))
824 goto out;
826 /* videoflip does not support 15 and 16-bit RGB so add a second videoconvert
827 * to do the final conversion. */
828 if (!(vconv2 = create_element("videoconvert", "base")))
829 goto out;
831 /* The bin takes ownership of these elements. */
832 gst_bin_add(GST_BIN(parser->container), deinterlace);
833 gst_element_sync_state_with_parent(deinterlace);
834 gst_bin_add(GST_BIN(parser->container), vconv);
835 gst_element_sync_state_with_parent(vconv);
836 gst_bin_add(GST_BIN(parser->container), flip);
837 gst_element_sync_state_with_parent(flip);
838 gst_bin_add(GST_BIN(parser->container), vconv2);
839 gst_element_sync_state_with_parent(vconv2);
841 gst_element_link(deinterlace, vconv);
842 gst_element_link(vconv, flip);
843 gst_element_link(flip, vconv2);
845 stream->post_sink = gst_element_get_static_pad(deinterlace, "sink");
846 stream->post_src = gst_element_get_static_pad(vconv2, "src");
847 stream->flip = flip;
849 else if (!strcmp(name, "audio/x-raw"))
851 GstElement *convert;
853 /* Currently our dsound can't handle 64-bit formats or all
854 * surround-sound configurations. Native dsound can't always handle
855 * 64-bit formats either. Add an audioconvert to allow changing bit
856 * depth and channel count. */
857 if (!(convert = create_element("audioconvert", "base")))
858 goto out;
860 gst_bin_add(GST_BIN(parser->container), convert);
861 gst_element_sync_state_with_parent(convert);
863 stream->post_sink = gst_element_get_static_pad(convert, "sink");
864 stream->post_src = gst_element_get_static_pad(convert, "src");
867 if (stream->post_sink)
869 if ((ret = gst_pad_link(pad, stream->post_sink)) < 0)
871 GST_ERROR("Failed to link decodebin source pad to post-processing elements, error %s.",
872 gst_pad_link_get_name(ret));
873 gst_object_unref(stream->post_sink);
874 stream->post_sink = NULL;
875 goto out;
878 if ((ret = gst_pad_link(stream->post_src, stream->my_sink)) < 0)
880 GST_ERROR("Failed to link post-processing elements to our sink pad, error %s.",
881 gst_pad_link_get_name(ret));
882 gst_object_unref(stream->post_src);
883 stream->post_src = NULL;
884 gst_object_unref(stream->post_sink);
885 stream->post_sink = NULL;
886 goto out;
889 else if ((ret = gst_pad_link(pad, stream->my_sink)) < 0)
891 GST_ERROR("Failed to link decodebin source pad to our sink pad, error %s.",
892 gst_pad_link_get_name(ret));
893 goto out;
896 gst_pad_set_active(stream->my_sink, 1);
897 gst_object_ref(stream->their_src = pad);
898 out:
899 gst_caps_unref(caps);
902 static void pad_removed_cb(GstElement *element, GstPad *pad, gpointer user)
904 struct wg_parser *parser = user;
905 unsigned int i;
906 char *name;
908 GST_LOG("parser %p, element %p, pad %p.", parser, element, pad);
910 for (i = 0; i < parser->stream_count; ++i)
912 struct wg_parser_stream *stream = parser->streams[i];
914 if (stream->their_src == pad)
916 if (stream->post_sink)
917 gst_pad_unlink(stream->their_src, stream->post_sink);
918 else
919 gst_pad_unlink(stream->their_src, stream->my_sink);
920 gst_object_unref(stream->their_src);
921 stream->their_src = NULL;
922 return;
926 name = gst_pad_get_name(pad);
927 GST_WARNING("No pin matching pad \"%s\" found.", name);
928 g_free(name);
931 static GstFlowReturn src_getrange_cb(GstPad *pad, GstObject *parent,
932 guint64 offset, guint size, GstBuffer **buffer)
934 struct wg_parser *parser = gst_pad_get_element_private(pad);
935 GstFlowReturn ret;
937 GST_LOG("pad %p, offset %" G_GINT64_MODIFIER "u, size %u, buffer %p.", pad, offset, size, *buffer);
939 if (offset == GST_BUFFER_OFFSET_NONE)
940 offset = parser->next_pull_offset;
941 parser->next_pull_offset = offset + size;
943 if (!size)
945 /* asfreader occasionally asks for zero bytes. gst_buffer_map() will
946 * return NULL in this case. Avoid confusing the read thread by asking
947 * it for zero bytes. */
948 if (!*buffer)
949 *buffer = gst_buffer_new_and_alloc(0);
950 gst_buffer_set_size(*buffer, 0);
951 GST_LOG("Returning empty buffer.");
952 return GST_FLOW_OK;
955 pthread_mutex_lock(&parser->mutex);
957 assert(!parser->read_request.size);
958 parser->read_request.buffer = *buffer;
959 parser->read_request.offset = offset;
960 parser->read_request.size = size;
961 parser->read_request.done = false;
962 pthread_cond_signal(&parser->read_cond);
964 /* Note that we don't unblock this wait on GST_EVENT_FLUSH_START. We expect
965 * the upstream pin to flush if necessary. We should never be blocked on
966 * read_thread() not running. */
968 while (!parser->read_request.done)
969 pthread_cond_wait(&parser->read_done_cond, &parser->mutex);
971 *buffer = parser->read_request.buffer;
972 ret = parser->read_request.ret;
974 pthread_mutex_unlock(&parser->mutex);
976 GST_LOG("Request returned %s.", gst_flow_get_name(ret));
978 return ret;
981 static gboolean src_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
983 struct wg_parser *parser = gst_pad_get_element_private(pad);
984 GstFormat format;
986 GST_LOG("parser %p, type %s.", parser, GST_QUERY_TYPE_NAME(query));
988 switch (GST_QUERY_TYPE(query))
990 case GST_QUERY_DURATION:
991 gst_query_parse_duration(query, &format, NULL);
992 if (format == GST_FORMAT_PERCENT)
994 gst_query_set_duration(query, GST_FORMAT_PERCENT, GST_FORMAT_PERCENT_MAX);
995 return TRUE;
997 else if (format == GST_FORMAT_BYTES)
999 gst_query_set_duration(query, GST_FORMAT_BYTES, parser->file_size);
1000 return TRUE;
1002 return FALSE;
1004 case GST_QUERY_SEEKING:
1005 gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1006 if (format != GST_FORMAT_BYTES)
1008 GST_WARNING("Cannot seek using format \"%s\".", gst_format_get_name(format));
1009 return FALSE;
1011 gst_query_set_seeking(query, GST_FORMAT_BYTES, 1, 0, parser->file_size);
1012 return TRUE;
1014 case GST_QUERY_SCHEDULING:
1015 gst_query_set_scheduling(query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1016 gst_query_add_scheduling_mode(query, GST_PAD_MODE_PUSH);
1017 gst_query_add_scheduling_mode(query, GST_PAD_MODE_PULL);
1018 return TRUE;
1020 default:
1021 GST_WARNING("Unhandled query type %s.", GST_QUERY_TYPE_NAME(query));
1022 return FALSE;
1026 static void *push_data(void *arg)
1028 struct wg_parser *parser = arg;
1029 GstBuffer *buffer;
1030 guint max_size;
1032 GST_DEBUG("Starting push thread.");
1034 if (!(buffer = gst_buffer_new_allocate(NULL, 16384, NULL)))
1036 GST_ERROR("Failed to allocate memory.");
1037 return NULL;
1040 max_size = parser->stop_offset ? parser->stop_offset : parser->file_size;
1042 for (;;)
1044 ULONG size;
1045 int ret;
1047 if (parser->next_offset >= max_size)
1048 break;
1049 size = min(16384, max_size - parser->next_offset);
1051 if ((ret = src_getrange_cb(parser->my_src, NULL, parser->next_offset, size, &buffer)) < 0)
1053 GST_ERROR("Failed to read data, ret %s.", gst_flow_get_name(ret));
1054 break;
1057 parser->next_offset += size;
1059 buffer->duration = buffer->pts = -1;
1060 if ((ret = gst_pad_push(parser->my_src, buffer)) < 0)
1062 GST_ERROR("Failed to push data, ret %s.", gst_flow_get_name(ret));
1063 break;
1067 gst_buffer_unref(buffer);
1069 gst_pad_push_event(parser->my_src, gst_event_new_eos());
1071 GST_DEBUG("Stopping push thread.");
1073 return NULL;
1076 static gboolean activate_push(GstPad *pad, gboolean activate)
1078 struct wg_parser *parser = gst_pad_get_element_private(pad);
1080 if (!activate)
1082 if (parser->push_thread)
1084 pthread_join(parser->push_thread, NULL);
1085 parser->push_thread = 0;
1088 else if (!parser->push_thread)
1090 int ret;
1092 if ((ret = pthread_create(&parser->push_thread, NULL, push_data, parser)))
1094 GST_ERROR("Failed to create push thread: %s", strerror(errno));
1095 parser->push_thread = 0;
1096 return FALSE;
1099 return TRUE;
1102 static gboolean src_activate_mode_cb(GstPad *pad, GstObject *parent, GstPadMode mode, gboolean activate)
1104 struct wg_parser *parser = gst_pad_get_element_private(pad);
1106 GST_DEBUG("%s source pad for parser %p in %s mode.",
1107 activate ? "Activating" : "Deactivating", parser, gst_pad_mode_get_name(mode));
1109 switch (mode)
1111 case GST_PAD_MODE_PULL:
1112 return TRUE;
1113 case GST_PAD_MODE_PUSH:
1114 return activate_push(pad, activate);
1115 case GST_PAD_MODE_NONE:
1116 break;
1118 return FALSE;
1121 static GstBusSyncReply bus_handler_cb(GstBus *bus, GstMessage *msg, gpointer user)
1123 struct wg_parser *parser = user;
1124 gchar *dbg_info = NULL;
1125 GError *err = NULL;
1127 GST_DEBUG("parser %p, message type %s.", parser, GST_MESSAGE_TYPE_NAME(msg));
1129 switch (msg->type)
1131 case GST_MESSAGE_ERROR:
1132 gst_message_parse_error(msg, &err, &dbg_info);
1133 if (parser->err_on)
1135 fprintf(stderr, "winegstreamer error: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
1136 fprintf(stderr, "winegstreamer error: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info);
1138 g_error_free(err);
1139 g_free(dbg_info);
1140 pthread_mutex_lock(&parser->mutex);
1141 parser->error = true;
1142 pthread_mutex_unlock(&parser->mutex);
1143 pthread_cond_signal(&parser->init_cond);
1144 break;
1146 case GST_MESSAGE_WARNING:
1147 gst_message_parse_warning(msg, &err, &dbg_info);
1148 if (parser->warn_on)
1150 fprintf(stderr, "winegstreamer warning: %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
1151 fprintf(stderr, "winegstreamer warning: %s: %s\n", GST_OBJECT_NAME(msg->src), dbg_info);
1153 g_error_free(err);
1154 g_free(dbg_info);
1155 break;
1157 case GST_MESSAGE_DURATION_CHANGED:
1158 pthread_mutex_lock(&parser->mutex);
1159 parser->has_duration = true;
1160 pthread_mutex_unlock(&parser->mutex);
1161 pthread_cond_signal(&parser->init_cond);
1162 break;
1164 default:
1165 break;
1167 gst_message_unref(msg);
1168 return GST_BUS_DROP;
1171 static gboolean src_perform_seek(struct wg_parser *parser, GstEvent *event)
1173 BOOL thread = !!parser->push_thread;
1174 GstSeekType cur_type, stop_type;
1175 GstFormat seek_format;
1176 GstEvent *flush_event;
1177 GstSeekFlags flags;
1178 gint64 cur, stop;
1179 guint32 seqnum;
1180 gdouble rate;
1182 gst_event_parse_seek(event, &rate, &seek_format, &flags,
1183 &cur_type, &cur, &stop_type, &stop);
1185 if (seek_format != GST_FORMAT_BYTES)
1187 GST_FIXME("Unhandled format \"%s\".", gst_format_get_name(seek_format));
1188 return FALSE;
1191 seqnum = gst_event_get_seqnum(event);
1193 /* send flush start */
1194 if (flags & GST_SEEK_FLAG_FLUSH)
1196 flush_event = gst_event_new_flush_start();
1197 gst_event_set_seqnum(flush_event, seqnum);
1198 gst_pad_push_event(parser->my_src, flush_event);
1199 if (thread)
1200 gst_pad_set_active(parser->my_src, 1);
1203 parser->next_offset = parser->start_offset = cur;
1205 /* and prepare to continue streaming */
1206 if (flags & GST_SEEK_FLAG_FLUSH)
1208 flush_event = gst_event_new_flush_stop(TRUE);
1209 gst_event_set_seqnum(flush_event, seqnum);
1210 gst_pad_push_event(parser->my_src, flush_event);
1211 if (thread)
1212 gst_pad_set_active(parser->my_src, 1);
1215 return TRUE;
1218 static gboolean src_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
1220 struct wg_parser *parser = gst_pad_get_element_private(pad);
1221 gboolean ret = TRUE;
1223 GST_LOG("parser %p, type \"%s\".", parser, GST_EVENT_TYPE_NAME(event));
1225 switch (event->type)
1227 case GST_EVENT_SEEK:
1228 ret = src_perform_seek(parser, event);
1229 break;
1231 case GST_EVENT_FLUSH_START:
1232 case GST_EVENT_FLUSH_STOP:
1233 case GST_EVENT_QOS:
1234 case GST_EVENT_RECONFIGURE:
1235 break;
1237 default:
1238 GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
1239 ret = FALSE;
1240 break;
1242 gst_event_unref(event);
1243 return ret;
1246 static void query_tags(struct wg_parser_stream *stream)
1248 const gchar *struct_name;
1249 GstEvent *tag_event;
1250 guint i, j;
1252 stream->tags[WG_PARSER_TAG_NAME] = NULL;
1253 stream->tags[WG_PARSER_TAG_LANGUAGE] = NULL;
1255 i = 0;
1256 while ((tag_event = gst_pad_get_sticky_event(stream->their_src, GST_EVENT_TAG, i++)))
1258 GstTagList *tag_list;
1260 gst_event_parse_tag(tag_event, &tag_list);
1262 if (!stream->tags[WG_PARSER_TAG_NAME])
1264 /* Extract stream name from Quick Time demuxer private tag where it puts unrecognized chunks. */
1265 const GValue *val;
1266 GstSample *sample;
1267 GstBuffer *buf;
1268 gsize size;
1269 guint tag_count = gst_tag_list_get_tag_size(tag_list, "private-qt-tag");
1271 for (j = 0; j < tag_count; ++j)
1273 if (!(val = gst_tag_list_get_value_index(tag_list, "private-qt-tag", j)))
1274 continue;
1275 if (!GST_VALUE_HOLDS_SAMPLE(val) || !(sample = gst_value_get_sample(val)))
1276 continue;
1277 struct_name = gst_structure_get_name(gst_sample_get_info(sample));
1278 if (!struct_name || strcmp(struct_name, "application/x-gst-qt-name-tag"))
1279 continue;
1280 if (!(buf = gst_sample_get_buffer(sample)))
1281 continue;
1282 if ((size = gst_buffer_get_size(buf)) < 8)
1283 continue;
1284 size -= 8;
1285 if (!(stream->tags[WG_PARSER_TAG_NAME] = g_malloc(size + 1)))
1286 continue;
1287 if (gst_buffer_extract(buf, 8, stream->tags[WG_PARSER_TAG_NAME], size) != size)
1289 g_free(stream->tags[WG_PARSER_TAG_NAME]);
1290 stream->tags[WG_PARSER_TAG_NAME] = NULL;
1291 continue;
1293 stream->tags[WG_PARSER_TAG_NAME][size] = 0;
1297 if (!stream->tags[WG_PARSER_TAG_LANGUAGE])
1299 gchar *lang_code = NULL;
1301 gst_tag_list_get_string(tag_list, GST_TAG_LANGUAGE_CODE, &lang_code);
1302 if (stream->parser->sink_caps && !strcmp(stream->parser->sink_caps, "video/quicktime"))
1304 /* For QuickTime media, we convert the language tags to ISO 639-1. */
1305 const gchar *lang_code_iso_639_1 = lang_code ? gst_tag_get_language_code_iso_639_1(lang_code) : NULL;
1306 stream->tags[WG_PARSER_TAG_LANGUAGE] = lang_code_iso_639_1 ? g_strdup(lang_code_iso_639_1) : NULL;
1307 g_free(lang_code);
1309 else
1310 stream->tags[WG_PARSER_TAG_LANGUAGE] = lang_code;
1313 gst_event_unref(tag_event);
1317 static NTSTATUS wg_parser_connect(void *args)
1319 GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE("quartz_src",
1320 GST_PAD_SRC, GST_PAD_ALWAYS, GST_STATIC_CAPS_ANY);
1321 const struct wg_parser_connect_params *params = args;
1322 struct wg_parser *parser = params->parser;
1323 unsigned int i;
1324 int ret;
1326 parser->file_size = params->file_size;
1327 parser->sink_connected = true;
1329 if (!parser->bus)
1331 parser->bus = gst_bus_new();
1332 gst_bus_set_sync_handler(parser->bus, bus_handler_cb, parser, NULL);
1335 parser->container = gst_bin_new(NULL);
1336 gst_element_set_bus(parser->container, parser->bus);
1338 parser->my_src = gst_pad_new_from_static_template(&src_template, "quartz-src");
1339 gst_pad_set_getrange_function(parser->my_src, src_getrange_cb);
1340 gst_pad_set_query_function(parser->my_src, src_query_cb);
1341 gst_pad_set_activatemode_function(parser->my_src, src_activate_mode_cb);
1342 gst_pad_set_event_function(parser->my_src, src_event_cb);
1343 gst_pad_set_element_private(parser->my_src, parser);
1345 parser->start_offset = parser->next_offset = parser->stop_offset = 0;
1346 parser->next_pull_offset = 0;
1347 parser->error = false;
1349 if (!parser->init_gst(parser))
1350 goto out;
1352 gst_element_set_state(parser->container, GST_STATE_PAUSED);
1353 ret = gst_element_get_state(parser->container, NULL, NULL, -1);
1354 if (ret == GST_STATE_CHANGE_FAILURE)
1356 GST_ERROR("Failed to play stream.\n");
1357 goto out;
1360 pthread_mutex_lock(&parser->mutex);
1362 while (!parser->no_more_pads && !parser->error)
1363 pthread_cond_wait(&parser->init_cond, &parser->mutex);
1364 if (parser->error)
1366 pthread_mutex_unlock(&parser->mutex);
1367 goto out;
1370 for (i = 0; i < parser->stream_count; ++i)
1372 struct wg_parser_stream *stream = parser->streams[i];
1373 gint64 duration;
1375 /* If we received a buffer, waiting for tags or caps does not make sense anymore. */
1376 while ((!stream->has_caps || !stream->has_tags) && !parser->error && !stream->has_buffer)
1377 pthread_cond_wait(&parser->init_cond, &parser->mutex);
1379 /* GStreamer doesn't actually provide any guarantees about when duration
1380 * is available, even for seekable streams. It's basically built for
1381 * applications that don't care, e.g. movie players that can display
1382 * a duration once it's available, and update it visually if a better
1383 * estimate is found. This doesn't really match well with DirectShow or
1384 * Media Foundation, which both expect duration to be available
1385 * immediately on connecting, so we have to use some complex heuristics
1386 * to try to actually get a usable duration.
1388 * Some elements (avidemux, wavparse, qtdemux) record duration almost
1389 * immediately, before fixing caps. Such elements don't send
1390 * duration-changed messages. Therefore always try querying duration
1391 * after caps have been found.
1393 * Some elements (mpegaudioparse) send duration-changed. In the case of
1394 * a mp3 stream without seek tables it will not be sent immediately, but
1395 * only after enough frames have been parsed to form an estimate. They
1396 * may send it multiple times with increasingly accurate estimates, but
1397 * unfortunately we have no way of knowing whether another estimate will
1398 * be sent, so we always take the first one. We assume that if the
1399 * duration is not immediately available then the element will always
1400 * send duration-changed.
1403 for (;;)
1405 if (parser->error)
1407 pthread_mutex_unlock(&parser->mutex);
1408 goto out;
1410 if (gst_pad_query_duration(stream->their_src, GST_FORMAT_TIME, &duration))
1412 stream->duration = duration / 100;
1413 break;
1416 if (stream->eos)
1418 stream->duration = 0;
1419 GST_WARNING("Failed to query duration.\n");
1420 break;
1423 /* Elements based on GstBaseParse send duration-changed before
1424 * actually updating the duration in GStreamer versions prior
1425 * to 1.17.1. See <gstreamer.git:d28e0b4147fe7073b2>. So after
1426 * receiving duration-changed we have to continue polling until
1427 * the query succeeds. */
1428 if (parser->has_duration)
1430 pthread_mutex_unlock(&parser->mutex);
1431 g_usleep(10000);
1432 pthread_mutex_lock(&parser->mutex);
1434 else
1436 pthread_cond_wait(&parser->init_cond, &parser->mutex);
1440 query_tags(stream);
1442 /* Now that we're fully initialized, enable the stream so that further
1443 * samples get queued instead of being discarded. We don't actually need
1444 * the samples (in particular, the frontend should seek before
1445 * attempting to read anything), but we don't want to waste CPU time
1446 * trying to decode them. */
1447 stream->enabled = true;
1450 pthread_mutex_unlock(&parser->mutex);
1452 parser->next_offset = 0;
1453 return S_OK;
1455 out:
1456 if (parser->container)
1457 gst_element_set_state(parser->container, GST_STATE_NULL);
1458 if (parser->their_sink)
1460 gst_object_unref(parser->their_sink);
1461 parser->my_src = parser->their_sink = NULL;
1464 for (i = 0; i < parser->stream_count; ++i)
1465 free_stream(parser->streams[i]);
1466 parser->stream_count = 0;
1467 free(parser->streams);
1468 parser->streams = NULL;
1470 if (parser->container)
1472 gst_element_set_bus(parser->container, NULL);
1473 gst_object_unref(parser->container);
1474 parser->container = NULL;
1477 g_free(parser->sink_caps);
1478 parser->sink_caps = NULL;
1480 pthread_mutex_lock(&parser->mutex);
1481 parser->sink_connected = false;
1482 pthread_mutex_unlock(&parser->mutex);
1483 pthread_cond_signal(&parser->read_cond);
1485 return E_FAIL;
1488 static NTSTATUS wg_parser_disconnect(void *args)
1490 struct wg_parser *parser = args;
1491 unsigned int i;
1493 /* Unblock all of our streams. */
1494 pthread_mutex_lock(&parser->mutex);
1495 for (i = 0; i < parser->stream_count; ++i)
1497 parser->streams[i]->flushing = true;
1498 pthread_cond_signal(&parser->streams[i]->event_empty_cond);
1500 pthread_mutex_unlock(&parser->mutex);
1502 gst_element_set_state(parser->container, GST_STATE_NULL);
1503 gst_object_unref(parser->my_src);
1504 gst_object_unref(parser->their_sink);
1505 parser->my_src = parser->their_sink = NULL;
1507 pthread_mutex_lock(&parser->mutex);
1508 parser->sink_connected = false;
1509 pthread_mutex_unlock(&parser->mutex);
1510 pthread_cond_signal(&parser->read_cond);
1512 for (i = 0; i < parser->stream_count; ++i)
1513 free_stream(parser->streams[i]);
1515 parser->stream_count = 0;
1516 free(parser->streams);
1517 parser->streams = NULL;
1519 gst_element_set_bus(parser->container, NULL);
1520 gst_object_unref(parser->container);
1521 parser->container = NULL;
1523 g_free(parser->sink_caps);
1524 parser->sink_caps = NULL;
1526 return S_OK;
1529 static BOOL decodebin_parser_init_gst(struct wg_parser *parser)
1531 GstElement *element;
1532 int ret;
1534 if (!(element = create_element("decodebin", "base")))
1535 return FALSE;
1537 gst_bin_add(GST_BIN(parser->container), element);
1538 parser->decodebin = element;
1540 if (parser->unlimited_buffering)
1542 g_object_set(parser->decodebin, "max-size-buffers", G_MAXUINT, NULL);
1543 g_object_set(parser->decodebin, "max-size-time", G_MAXUINT64, NULL);
1544 g_object_set(parser->decodebin, "max-size-bytes", G_MAXUINT, NULL);
1547 g_signal_connect(element, "pad-added", G_CALLBACK(pad_added_cb), parser);
1548 g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser);
1549 g_signal_connect(element, "autoplug-select", G_CALLBACK(autoplug_select_cb), parser);
1550 g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
1552 parser->their_sink = gst_element_get_static_pad(element, "sink");
1554 pthread_mutex_lock(&parser->mutex);
1555 parser->no_more_pads = false;
1556 pthread_mutex_unlock(&parser->mutex);
1558 if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
1560 GST_ERROR("Failed to link pads, error %d.", ret);
1561 return FALSE;
1564 return TRUE;
1567 static BOOL avi_parser_init_gst(struct wg_parser *parser)
1569 GstElement *element;
1570 int ret;
1572 if (!(element = create_element("avidemux", "good")))
1573 return FALSE;
1575 gst_bin_add(GST_BIN(parser->container), element);
1577 g_signal_connect(element, "pad-added", G_CALLBACK(pad_added_cb), parser);
1578 g_signal_connect(element, "pad-removed", G_CALLBACK(pad_removed_cb), parser);
1579 g_signal_connect(element, "no-more-pads", G_CALLBACK(no_more_pads_cb), parser);
1581 parser->their_sink = gst_element_get_static_pad(element, "sink");
1583 pthread_mutex_lock(&parser->mutex);
1584 parser->no_more_pads = false;
1585 pthread_mutex_unlock(&parser->mutex);
1587 if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
1589 GST_ERROR("Failed to link pads, error %d.", ret);
1590 return FALSE;
1593 return TRUE;
1596 static BOOL mpeg_audio_parser_init_gst(struct wg_parser *parser)
1598 struct wg_parser_stream *stream;
1599 GstElement *element;
1600 int ret;
1602 if (!(element = create_element("mpegaudioparse", "good")))
1603 return FALSE;
1605 gst_bin_add(GST_BIN(parser->container), element);
1607 parser->their_sink = gst_element_get_static_pad(element, "sink");
1608 if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
1610 GST_ERROR("Failed to link sink pads, error %d.", ret);
1611 return FALSE;
1614 if (!(stream = create_stream(parser)))
1615 return FALSE;
1617 gst_object_ref(stream->their_src = gst_element_get_static_pad(element, "src"));
1618 if ((ret = gst_pad_link(stream->their_src, stream->my_sink)) < 0)
1620 GST_ERROR("Failed to link source pads, error %d.", ret);
1621 return FALSE;
1623 gst_pad_set_active(stream->my_sink, 1);
1625 parser->no_more_pads = true;
1627 return TRUE;
1630 static BOOL wave_parser_init_gst(struct wg_parser *parser)
1632 struct wg_parser_stream *stream;
1633 GstElement *element;
1634 int ret;
1636 if (!(element = create_element("wavparse", "good")))
1637 return FALSE;
1639 gst_bin_add(GST_BIN(parser->container), element);
1641 parser->their_sink = gst_element_get_static_pad(element, "sink");
1642 if ((ret = gst_pad_link(parser->my_src, parser->their_sink)) < 0)
1644 GST_ERROR("Failed to link sink pads, error %d.", ret);
1645 return FALSE;
1648 if (!(stream = create_stream(parser)))
1649 return FALSE;
1651 stream->their_src = gst_element_get_static_pad(element, "src");
1652 gst_object_ref(stream->their_src);
1653 if ((ret = gst_pad_link(stream->their_src, stream->my_sink)) < 0)
1655 GST_ERROR("Failed to link source pads, error %d.", ret);
1656 return FALSE;
1658 gst_pad_set_active(stream->my_sink, 1);
1660 parser->no_more_pads = true;
1662 return TRUE;
1665 static NTSTATUS wg_parser_create(void *args)
1667 static const init_gst_cb init_funcs[] =
1669 [WG_PARSER_DECODEBIN] = decodebin_parser_init_gst,
1670 [WG_PARSER_AVIDEMUX] = avi_parser_init_gst,
1671 [WG_PARSER_MPEGAUDIOPARSE] = mpeg_audio_parser_init_gst,
1672 [WG_PARSER_WAVPARSE] = wave_parser_init_gst,
1675 struct wg_parser_create_params *params = args;
1676 struct wg_parser *parser;
1678 if (!(parser = calloc(1, sizeof(*parser))))
1679 return E_OUTOFMEMORY;
1681 pthread_mutex_init(&parser->mutex, NULL);
1682 pthread_cond_init(&parser->init_cond, NULL);
1683 pthread_cond_init(&parser->read_cond, NULL);
1684 pthread_cond_init(&parser->read_done_cond, NULL);
1685 parser->init_gst = init_funcs[params->type];
1686 parser->unlimited_buffering = params->unlimited_buffering;
1687 parser->err_on = params->err_on;
1688 parser->warn_on = params->warn_on;
1689 GST_DEBUG("Created winegstreamer parser %p.", parser);
1690 params->parser = parser;
1691 return S_OK;
1694 static NTSTATUS wg_parser_destroy(void *args)
1696 struct wg_parser *parser = args;
1698 if (parser->bus)
1700 gst_bus_set_sync_handler(parser->bus, NULL, NULL, NULL);
1701 gst_object_unref(parser->bus);
1704 pthread_mutex_destroy(&parser->mutex);
1705 pthread_cond_destroy(&parser->init_cond);
1706 pthread_cond_destroy(&parser->read_cond);
1707 pthread_cond_destroy(&parser->read_done_cond);
1709 free(parser);
1710 return S_OK;
1713 const unixlib_entry_t __wine_unix_call_funcs[] =
1715 #define X(name) [unix_ ## name] = name
1716 X(wg_init_gstreamer),
1718 X(wg_parser_create),
1719 X(wg_parser_destroy),
1721 X(wg_parser_connect),
1722 X(wg_parser_disconnect),
1724 X(wg_parser_get_next_read_offset),
1725 X(wg_parser_push_data),
1727 X(wg_parser_get_stream_count),
1728 X(wg_parser_get_stream),
1730 X(wg_parser_stream_get_preferred_format),
1731 X(wg_parser_stream_enable),
1732 X(wg_parser_stream_disable),
1734 X(wg_parser_stream_get_buffer),
1735 X(wg_parser_stream_copy_buffer),
1736 X(wg_parser_stream_release_buffer),
1737 X(wg_parser_stream_notify_qos),
1739 X(wg_parser_stream_get_duration),
1740 X(wg_parser_stream_get_tag),
1741 X(wg_parser_stream_seek),
1743 X(wg_transform_create),
1744 X(wg_transform_destroy),
1745 X(wg_transform_set_output_format),
1747 X(wg_transform_push_data),
1748 X(wg_transform_read_data),
1749 X(wg_transform_get_status),