d3dx9: Unify calling parse_mesh helper functions.
[wine.git] / dlls / winegstreamer / wg_muxer.c
blobc0614807459c0e2712e13ea8b860914fc97adfb9
1 /*
2 * GStreamer muxer backend
4 * Copyright 2023 Ziqing Hui for CodeWeavers
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 * wg_muxer will autoplug gstreamer muxer and parser elements.
23 * It creates a pipeline like this:
25 * ------------------- -------
26 * [my_src 1] ==> |parser 1 (optional)| ==> | |
27 * ------------------- | |
28 * | |
29 * ------------------- | |
30 * [my_src 2] ==> |parser 2 (optional)| ==> | |
31 * ------------------- | |
32 * | muxer | ==> [my_sink]
33 * | |
34 * [ ...... ] | |
35 * | |
36 * | |
37 * ------------------- | |
38 * [my_src n] ==> |parser n (optional)| ==> | |
39 * ------------------- -------
42 #if 0
43 #pragma makedep unix
44 #endif
46 #include <stdio.h>
48 #include "ntstatus.h"
49 #define WIN32_NO_STATUS
50 #include "winternl.h"
52 #include "unix_private.h"
54 #include "wine/list.h"
56 struct wg_muxer
58 GstElement *container, *muxer;
59 GstPad *my_sink;
60 GstCaps *my_sink_caps;
62 GstAtomicQueue *output_queue;
63 GstBuffer *buffer;
65 pthread_mutex_t mutex;
66 guint64 offset; /* Write offset of the output buffer generated by muxer. */
68 struct list streams;
71 struct wg_muxer_stream
73 struct wg_muxer *muxer;
74 struct wg_format format;
75 uint32_t id;
77 GstPad *my_src;
78 GstCaps *my_src_caps, *parser_src_caps;
79 GstElement *parser;
80 GstSegment segment;
82 struct list entry;
85 static struct wg_muxer *get_muxer(wg_muxer_t muxer)
87 return (struct wg_muxer *)(ULONG_PTR)muxer;
90 static struct wg_muxer_stream *muxer_get_stream_by_id(struct wg_muxer *muxer, DWORD id)
92 struct wg_muxer_stream *stream;
94 LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
96 if (stream->id == id)
97 return stream;
100 return NULL;
103 static bool muxer_try_muxer_factory(struct wg_muxer *muxer, GstElementFactory *muxer_factory)
105 struct wg_muxer_stream *stream;
107 GST_INFO("Trying %"GST_PTR_FORMAT".", muxer_factory);
109 LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
111 GstCaps *caps = stream->parser ? stream->parser_src_caps : stream->my_src_caps;
113 if (!gst_element_factory_can_sink_any_caps(muxer_factory, caps))
115 GST_INFO("%"GST_PTR_FORMAT" cannot sink stream %u %p, caps %"GST_PTR_FORMAT,
116 muxer_factory, stream->id, stream, caps);
117 return false;
121 return true;
124 static GstElement *muxer_find_muxer(struct wg_muxer *muxer)
126 /* Some muxers are formatter, eg. id3mux. */
127 GstElementFactoryListType muxer_type = GST_ELEMENT_FACTORY_TYPE_MUXER | GST_ELEMENT_FACTORY_TYPE_FORMATTER;
128 GstElement *element = NULL;
129 GList *muxers, *tmp;
131 GST_DEBUG("muxer %p.", muxer);
133 muxers = find_element_factories(muxer_type, GST_RANK_NONE, NULL, muxer->my_sink_caps);
135 for (tmp = muxers; tmp && !element; tmp = tmp->next)
137 GstElementFactory *factory = GST_ELEMENT_FACTORY(tmp->data);
138 if (muxer_try_muxer_factory(muxer, factory))
139 element = factory_create_element(factory);
142 gst_plugin_feature_list_free(muxers);
144 if (!element)
145 GST_WARNING("Failed to find any compatible muxer element.");
147 return element;
150 static gboolean muxer_sink_query_cb(GstPad *pad, GstObject *parent, GstQuery *query)
152 struct wg_muxer *muxer = gst_pad_get_element_private(pad);
154 GST_DEBUG("pad %p, parent %p, query %p, muxer %p, type \"%s\".",
155 pad, parent, query, muxer, gst_query_type_get_name(query->type));
157 switch (query->type)
159 case GST_QUERY_SEEKING:
160 gst_query_set_seeking(query, GST_FORMAT_BYTES, true, 0, -1);
161 return true;
162 default:
163 GST_WARNING("Ignoring \"%s\" query.", gst_query_type_get_name(query->type));
164 return gst_pad_query_default(pad, parent, query);
168 static gboolean muxer_sink_event_cb(GstPad *pad, GstObject *parent, GstEvent *event)
170 struct wg_muxer *muxer = gst_pad_get_element_private(pad);
171 const GstSegment *segment;
173 GST_DEBUG("pad %p, parent %p, event %p, muxer %p, type \"%s\".",
174 pad, parent, event, muxer, GST_EVENT_TYPE_NAME(event));
176 switch (event->type)
178 case GST_EVENT_SEGMENT:
179 pthread_mutex_lock(&muxer->mutex);
181 gst_event_parse_segment(event, &segment);
182 if (segment->format != GST_FORMAT_BYTES)
184 pthread_mutex_unlock(&muxer->mutex);
185 GST_FIXME("Unhandled segment format \"%s\".", gst_format_get_name(segment->format));
186 break;
188 muxer->offset = segment->start;
190 pthread_mutex_unlock(&muxer->mutex);
191 break;
193 default:
194 GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event));
195 break;
198 gst_event_unref(event);
199 return TRUE;
202 static GstFlowReturn muxer_sink_chain_cb(GstPad *pad, GstObject *parent, GstBuffer *buffer)
204 GstBuffer *buffer_writable= gst_buffer_make_writable(buffer);
205 struct wg_muxer *muxer = gst_pad_get_element_private(pad);
207 GST_DEBUG("muxer %p, pad %"GST_PTR_FORMAT", parent %"GST_PTR_FORMAT", buffer <%"GST_PTR_FORMAT">.",
208 muxer, pad, parent, buffer);
210 pthread_mutex_lock(&muxer->mutex);
212 GST_BUFFER_OFFSET(buffer_writable) = GST_BUFFER_OFFSET_NONE;
213 if (muxer->offset != GST_BUFFER_OFFSET_NONE)
215 GST_BUFFER_OFFSET(buffer_writable) = muxer->offset;
216 muxer->offset = GST_BUFFER_OFFSET_NONE;
219 gst_atomic_queue_push(muxer->output_queue, buffer_writable);
221 GST_DEBUG("Pushed writable buffer <%"GST_PTR_FORMAT"> to output queue %p, %u buffers in queue now.",
222 buffer_writable, muxer->output_queue, gst_atomic_queue_length(muxer->output_queue));
224 pthread_mutex_unlock(&muxer->mutex);
226 return GST_FLOW_OK;
229 static void stream_free(struct wg_muxer_stream *stream)
231 if (stream->parser_src_caps)
232 gst_caps_unref(stream->parser_src_caps);
233 gst_object_unref(stream->my_src);
234 gst_caps_unref(stream->my_src_caps);
235 free(stream);
238 NTSTATUS wg_muxer_create(void *args)
240 struct wg_muxer_create_params *params = args;
241 NTSTATUS status = STATUS_UNSUCCESSFUL;
242 GstPadTemplate *template = NULL;
243 struct wg_muxer *muxer;
245 /* Create wg_muxer object. */
246 if (!(muxer = calloc(1, sizeof(*muxer))))
247 return STATUS_NO_MEMORY;
248 list_init(&muxer->streams);
249 muxer->offset = GST_BUFFER_OFFSET_NONE;
250 pthread_mutex_init(&muxer->mutex, NULL);
251 if (!(muxer->container = gst_bin_new("wg_muxer")))
252 goto out;
253 if (!(muxer->output_queue = gst_atomic_queue_new(8)))
254 goto out;
256 /* Create sink pad. */
257 if (!(muxer->my_sink_caps = gst_caps_from_string(params->format)))
259 GST_ERROR("Failed to get caps from format string: \"%s\".", params->format);
260 goto out;
262 if (!(template = gst_pad_template_new("sink", GST_PAD_SINK, GST_PAD_ALWAYS, muxer->my_sink_caps)))
263 goto out;
264 muxer->my_sink = gst_pad_new_from_template(template, "wg_muxer_sink");
265 if (!muxer->my_sink)
266 goto out;
267 gst_pad_set_element_private(muxer->my_sink, muxer);
268 gst_pad_set_query_function(muxer->my_sink, muxer_sink_query_cb);
269 gst_pad_set_event_function(muxer->my_sink, muxer_sink_event_cb);
270 gst_pad_set_chain_function(muxer->my_sink, muxer_sink_chain_cb);
272 gst_object_unref(template);
274 GST_INFO("Created winegstreamer muxer %p.", muxer);
275 params->muxer = (wg_transform_t)(ULONG_PTR)muxer;
277 return STATUS_SUCCESS;
279 out:
280 if (muxer->my_sink)
281 gst_object_unref(muxer->my_sink);
282 if (template)
283 gst_object_unref(template);
284 if (muxer->my_sink_caps)
285 gst_caps_unref(muxer->my_sink_caps);
286 if (muxer->output_queue)
287 gst_atomic_queue_unref(muxer->output_queue);
288 if (muxer->container)
289 gst_object_unref(muxer->container);
290 pthread_mutex_destroy(&muxer->mutex);
291 free(muxer);
293 return status;
296 NTSTATUS wg_muxer_destroy(void *args)
298 struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args);
299 struct wg_muxer_stream *stream, *next;
300 GstBuffer *buffer;
302 LIST_FOR_EACH_ENTRY_SAFE(stream, next, &muxer->streams, struct wg_muxer_stream, entry)
304 list_remove(&stream->entry);
305 stream_free(stream);
308 if (muxer->buffer)
309 gst_buffer_unref(muxer->buffer);
311 while ((buffer = gst_atomic_queue_pop(muxer->output_queue)))
312 gst_buffer_unref(buffer);
313 gst_atomic_queue_unref(muxer->output_queue);
315 gst_object_unref(muxer->my_sink);
316 gst_caps_unref(muxer->my_sink_caps);
317 gst_element_set_state(muxer->container, GST_STATE_NULL);
318 gst_object_unref(muxer->container);
320 pthread_mutex_destroy(&muxer->mutex);
322 free(muxer);
324 return S_OK;
327 NTSTATUS wg_muxer_add_stream(void *args)
329 struct wg_muxer_add_stream_params *params = args;
330 struct wg_muxer *muxer = get_muxer(params->muxer);
331 NTSTATUS status = STATUS_UNSUCCESSFUL;
332 GstPadTemplate *template = NULL;
333 struct wg_muxer_stream *stream;
334 char src_pad_name[64];
336 GST_DEBUG("muxer %p, stream %u, format %p.", muxer, params->stream_id, params->format);
338 /* Create stream object. */
339 if (!(stream = calloc(1, sizeof(*stream))))
340 return STATUS_NO_MEMORY;
341 stream->muxer = muxer;
342 stream->format = *params->format;
343 stream->id = params->stream_id;
345 /* Create stream my_src pad. */
346 if (!(stream->my_src_caps = wg_format_to_caps(params->format)))
347 goto out;
348 if (!(template = gst_pad_template_new("src", GST_PAD_SRC, GST_PAD_ALWAYS, stream->my_src_caps)))
349 goto out;
350 sprintf(src_pad_name, "wg_muxer_stream_src_%u", stream->id);
351 if (!(stream->my_src = gst_pad_new_from_template(template, src_pad_name)))
352 goto out;
353 gst_pad_set_element_private(stream->my_src, stream);
355 /* Create parser. */
356 if ((stream->parser = find_element(GST_ELEMENT_FACTORY_TYPE_PARSER, stream->my_src_caps, NULL)))
358 GstPad *parser_src;
360 if (!gst_bin_add(GST_BIN(muxer->container), stream->parser)
361 || !link_src_to_element(stream->my_src, stream->parser))
362 goto out;
364 parser_src = gst_element_get_static_pad(stream->parser, "src");
365 stream->parser_src_caps = gst_pad_query_caps(parser_src, NULL);
366 GST_INFO("Created parser %"GST_PTR_FORMAT" for stream %u %p.",
367 stream->parser, stream->id, stream);
368 gst_object_unref(parser_src);
371 /* Add to muxer stream list. */
372 list_add_tail(&muxer->streams, &stream->entry);
374 gst_object_unref(template);
376 GST_INFO("Created winegstreamer muxer stream %p.", stream);
378 return STATUS_SUCCESS;
380 out:
381 if (stream->parser)
382 gst_object_unref(stream->parser);
383 if (stream->my_src)
384 gst_object_unref(stream->my_src);
385 if (template)
386 gst_object_unref(template);
387 if (stream->my_src_caps)
388 gst_caps_unref(stream->my_src_caps);
389 free(stream);
391 return status;
394 NTSTATUS wg_muxer_start(void *args)
396 struct wg_muxer *muxer = get_muxer(*(wg_muxer_t *)args);
397 NTSTATUS status = STATUS_UNSUCCESSFUL;
398 struct wg_muxer_stream *stream;
400 GST_DEBUG("muxer %p.", muxer);
402 /* Create muxer element. */
403 if (!(muxer->muxer = muxer_find_muxer(muxer))
404 || !gst_bin_add(GST_BIN(muxer->container), muxer->muxer))
405 return status;
407 /* Link muxer element to my_sink */
408 if (!link_element_to_sink(muxer->muxer, muxer->my_sink)
409 || !gst_pad_set_active(muxer->my_sink, 1))
410 return status;
412 /* Link each stream to muxer element. */
413 LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
415 bool link_ok = stream->parser ?
416 gst_element_link(stream->parser, muxer->muxer) :
417 link_src_to_element(stream->my_src, muxer->muxer);
419 if (!link_ok)
420 return status;
423 /* Set to pause state. */
424 if (gst_element_set_state(muxer->container, GST_STATE_PAUSED) == GST_STATE_CHANGE_FAILURE
425 || gst_element_get_state(muxer->container, NULL, NULL, -1) == GST_STATE_CHANGE_FAILURE)
426 return status;
428 /* Active stream my_src pad and push events to prepare for streaming. */
429 LIST_FOR_EACH_ENTRY(stream, &muxer->streams, struct wg_muxer_stream, entry)
431 char buffer[64];
433 sprintf(buffer, "wg_muxer_stream_src_%u", stream->id);
434 gst_segment_init(&stream->segment, GST_FORMAT_BYTES);
435 if (!gst_pad_set_active(stream->my_src, 1))
436 return status;
437 if (!push_event(stream->my_src, gst_event_new_stream_start(buffer))
438 || !push_event(stream->my_src, gst_event_new_caps(stream->my_src_caps))
439 || !push_event(stream->my_src, gst_event_new_segment(&stream->segment)))
440 return status;
443 GST_DEBUG("Started muxer %p.", muxer);
445 return STATUS_SUCCESS;
448 NTSTATUS wg_muxer_push_sample(void *args)
450 struct wg_muxer_push_sample_params *params = args;
451 struct wg_muxer *muxer = get_muxer(params->muxer);
452 struct wg_sample *sample = params->sample;
453 struct wg_muxer_stream *stream;
454 GstFlowReturn ret;
455 GstBuffer *buffer;
457 if (!(stream = muxer_get_stream_by_id(muxer, params->stream_id)))
458 return STATUS_NOT_FOUND;
460 /* Create sample data buffer. */
461 if (!(buffer = gst_buffer_new_and_alloc(sample->size))
462 || !gst_buffer_fill(buffer, 0, wg_sample_data(sample), sample->size))
464 GST_ERROR("Failed to allocate input buffer.");
465 return STATUS_NO_MEMORY;
468 GST_INFO("Copied %u bytes from sample %p to buffer %p.", sample->size, sample, buffer);
470 /* Set sample properties. */
471 if (sample->flags & WG_SAMPLE_FLAG_HAS_PTS)
472 GST_BUFFER_PTS(buffer) = sample->pts * 100;
473 if (sample->flags & WG_SAMPLE_FLAG_HAS_DURATION)
474 GST_BUFFER_DURATION(buffer) = sample->duration * 100;
475 if (!(sample->flags & WG_SAMPLE_FLAG_SYNC_POINT))
476 GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT);
477 if (sample->flags & WG_SAMPLE_FLAG_DISCONTINUITY)
478 GST_BUFFER_FLAG_SET(buffer, GST_BUFFER_FLAG_DISCONT);
480 /* Push sample data buffer to stream src pad. */
481 if ((ret = gst_pad_push(stream->my_src, buffer)) < 0)
483 GST_ERROR("Failed to push buffer %p to pad %s, reason %s.",
484 buffer, gst_pad_get_name(stream->my_src), gst_flow_get_name(ret));
485 return STATUS_UNSUCCESSFUL;
488 return STATUS_SUCCESS;
491 NTSTATUS wg_muxer_read_data(void *args)
493 struct wg_muxer_read_data_params *params = args;
494 struct wg_muxer *muxer = get_muxer(params->muxer);
495 gsize size, copied;
497 /* Pop buffer from output queue. */
498 if (!muxer->buffer)
500 if (!(muxer->buffer = gst_atomic_queue_pop(muxer->output_queue)))
501 return STATUS_NO_MEMORY;
503 /* We may continuously read data from a same buffer multiple times.
504 * But we only need to set the offset at the first reading. */
505 if (GST_BUFFER_OFFSET_IS_VALID(muxer->buffer))
506 params->offset = GST_BUFFER_OFFSET(muxer->buffer);
509 /* Copy data. */
510 size = min(gst_buffer_get_size(muxer->buffer), params->size);
511 copied = gst_buffer_extract(muxer->buffer, 0, params->buffer, size);
512 params->size = copied;
514 GST_INFO("Copied %"G_GSIZE_FORMAT" bytes from buffer <%"GST_PTR_FORMAT">", copied, muxer->buffer);
516 /* Unref buffer if all data is read. */
517 gst_buffer_resize(muxer->buffer, (gssize)copied, -1);
518 if (!gst_buffer_get_size(muxer->buffer))
520 gst_buffer_unref(muxer->buffer);
521 muxer->buffer = NULL;
524 return STATUS_SUCCESS;