2 * GStreamer muxer backend
4 * Copyright 2023 Ziqing Hui for CodeWeavers
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA
22 * wg_muxer will autoplug gstreamer muxer and parser elements.
23 * It creates a pipeline like this:
25 * ------------------- -------
26 * [my_src 1] ==> |parser 1 (optional)| ==> | |
27 * ------------------- | |
29 * ------------------- | |
30 * [my_src 2] ==> |parser 2 (optional)| ==> | |
31 * ------------------- | |
32 * | muxer | ==> [my_sink]
37 * ------------------- | |
38 * [my_src n] ==> |parser n (optional)| ==> | |
39 * ------------------- -------
49 #define WIN32_NO_STATUS
52 #include "unix_private.h"
54 #include "wine/list.h"
58 GstElement
*container
, *muxer
;
60 GstCaps
*my_sink_caps
;
62 GstAtomicQueue
*output_queue
;
65 pthread_mutex_t mutex
;
66 guint64 offset
; /* Write offset of the output buffer generated by muxer. */
71 struct wg_muxer_stream
73 struct wg_muxer
*muxer
;
74 struct wg_format format
;
78 GstCaps
*my_src_caps
, *parser_src_caps
;
85 static struct wg_muxer
*get_muxer(wg_muxer_t muxer
)
87 return (struct wg_muxer
*)(ULONG_PTR
)muxer
;
90 static struct wg_muxer_stream
*muxer_get_stream_by_id(struct wg_muxer
*muxer
, DWORD id
)
92 struct wg_muxer_stream
*stream
;
94 LIST_FOR_EACH_ENTRY(stream
, &muxer
->streams
, struct wg_muxer_stream
, entry
)
103 static bool muxer_try_muxer_factory(struct wg_muxer
*muxer
, GstElementFactory
*muxer_factory
)
105 struct wg_muxer_stream
*stream
;
107 GST_INFO("Trying %"GST_PTR_FORMAT
".", muxer_factory
);
109 LIST_FOR_EACH_ENTRY(stream
, &muxer
->streams
, struct wg_muxer_stream
, entry
)
111 GstCaps
*caps
= stream
->parser
? stream
->parser_src_caps
: stream
->my_src_caps
;
113 if (!gst_element_factory_can_sink_any_caps(muxer_factory
, caps
))
115 GST_INFO("%"GST_PTR_FORMAT
" cannot sink stream %u %p, caps %"GST_PTR_FORMAT
,
116 muxer_factory
, stream
->id
, stream
, caps
);
124 static GstElement
*muxer_find_muxer(struct wg_muxer
*muxer
)
126 /* Some muxers are formatter, eg. id3mux. */
127 GstElementFactoryListType muxer_type
= GST_ELEMENT_FACTORY_TYPE_MUXER
| GST_ELEMENT_FACTORY_TYPE_FORMATTER
;
128 GstElement
*element
= NULL
;
131 GST_DEBUG("muxer %p.", muxer
);
133 muxers
= find_element_factories(muxer_type
, GST_RANK_NONE
, NULL
, muxer
->my_sink_caps
);
135 for (tmp
= muxers
; tmp
&& !element
; tmp
= tmp
->next
)
137 GstElementFactory
*factory
= GST_ELEMENT_FACTORY(tmp
->data
);
138 if (muxer_try_muxer_factory(muxer
, factory
))
139 element
= factory_create_element(factory
);
142 gst_plugin_feature_list_free(muxers
);
145 GST_WARNING("Failed to find any compatible muxer element.");
150 static gboolean
muxer_sink_query_cb(GstPad
*pad
, GstObject
*parent
, GstQuery
*query
)
152 struct wg_muxer
*muxer
= gst_pad_get_element_private(pad
);
154 GST_DEBUG("pad %p, parent %p, query %p, muxer %p, type \"%s\".",
155 pad
, parent
, query
, muxer
, gst_query_type_get_name(query
->type
));
159 case GST_QUERY_SEEKING
:
160 gst_query_set_seeking(query
, GST_FORMAT_BYTES
, true, 0, -1);
163 GST_WARNING("Ignoring \"%s\" query.", gst_query_type_get_name(query
->type
));
164 return gst_pad_query_default(pad
, parent
, query
);
168 static gboolean
muxer_sink_event_cb(GstPad
*pad
, GstObject
*parent
, GstEvent
*event
)
170 struct wg_muxer
*muxer
= gst_pad_get_element_private(pad
);
171 const GstSegment
*segment
;
173 GST_DEBUG("pad %p, parent %p, event %p, muxer %p, type \"%s\".",
174 pad
, parent
, event
, muxer
, GST_EVENT_TYPE_NAME(event
));
178 case GST_EVENT_SEGMENT
:
179 pthread_mutex_lock(&muxer
->mutex
);
181 gst_event_parse_segment(event
, &segment
);
182 if (segment
->format
!= GST_FORMAT_BYTES
)
184 pthread_mutex_unlock(&muxer
->mutex
);
185 GST_FIXME("Unhandled segment format \"%s\".", gst_format_get_name(segment
->format
));
188 muxer
->offset
= segment
->start
;
190 pthread_mutex_unlock(&muxer
->mutex
);
194 GST_WARNING("Ignoring \"%s\" event.", GST_EVENT_TYPE_NAME(event
));
198 gst_event_unref(event
);
202 static GstFlowReturn
muxer_sink_chain_cb(GstPad
*pad
, GstObject
*parent
, GstBuffer
*buffer
)
204 GstBuffer
*buffer_writable
= gst_buffer_make_writable(buffer
);
205 struct wg_muxer
*muxer
= gst_pad_get_element_private(pad
);
207 GST_DEBUG("muxer %p, pad %"GST_PTR_FORMAT
", parent %"GST_PTR_FORMAT
", buffer <%"GST_PTR_FORMAT
">.",
208 muxer
, pad
, parent
, buffer
);
210 pthread_mutex_lock(&muxer
->mutex
);
212 GST_BUFFER_OFFSET(buffer_writable
) = GST_BUFFER_OFFSET_NONE
;
213 if (muxer
->offset
!= GST_BUFFER_OFFSET_NONE
)
215 GST_BUFFER_OFFSET(buffer_writable
) = muxer
->offset
;
216 muxer
->offset
= GST_BUFFER_OFFSET_NONE
;
219 gst_atomic_queue_push(muxer
->output_queue
, buffer_writable
);
221 GST_DEBUG("Pushed writable buffer <%"GST_PTR_FORMAT
"> to output queue %p, %u buffers in queue now.",
222 buffer_writable
, muxer
->output_queue
, gst_atomic_queue_length(muxer
->output_queue
));
224 pthread_mutex_unlock(&muxer
->mutex
);
229 static void stream_free(struct wg_muxer_stream
*stream
)
231 if (stream
->parser_src_caps
)
232 gst_caps_unref(stream
->parser_src_caps
);
233 gst_object_unref(stream
->my_src
);
234 gst_caps_unref(stream
->my_src_caps
);
238 NTSTATUS
wg_muxer_create(void *args
)
240 struct wg_muxer_create_params
*params
= args
;
241 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
242 GstPadTemplate
*template = NULL
;
243 struct wg_muxer
*muxer
;
245 /* Create wg_muxer object. */
246 if (!(muxer
= calloc(1, sizeof(*muxer
))))
247 return STATUS_NO_MEMORY
;
248 list_init(&muxer
->streams
);
249 muxer
->offset
= GST_BUFFER_OFFSET_NONE
;
250 pthread_mutex_init(&muxer
->mutex
, NULL
);
251 if (!(muxer
->container
= gst_bin_new("wg_muxer")))
253 if (!(muxer
->output_queue
= gst_atomic_queue_new(8)))
256 /* Create sink pad. */
257 if (!(muxer
->my_sink_caps
= gst_caps_from_string(params
->format
)))
259 GST_ERROR("Failed to get caps from format string: \"%s\".", params
->format
);
262 if (!(template = gst_pad_template_new("sink", GST_PAD_SINK
, GST_PAD_ALWAYS
, muxer
->my_sink_caps
)))
264 muxer
->my_sink
= gst_pad_new_from_template(template, "wg_muxer_sink");
267 gst_pad_set_element_private(muxer
->my_sink
, muxer
);
268 gst_pad_set_query_function(muxer
->my_sink
, muxer_sink_query_cb
);
269 gst_pad_set_event_function(muxer
->my_sink
, muxer_sink_event_cb
);
270 gst_pad_set_chain_function(muxer
->my_sink
, muxer_sink_chain_cb
);
272 gst_object_unref(template);
274 GST_INFO("Created winegstreamer muxer %p.", muxer
);
275 params
->muxer
= (wg_transform_t
)(ULONG_PTR
)muxer
;
277 return STATUS_SUCCESS
;
281 gst_object_unref(muxer
->my_sink
);
283 gst_object_unref(template);
284 if (muxer
->my_sink_caps
)
285 gst_caps_unref(muxer
->my_sink_caps
);
286 if (muxer
->output_queue
)
287 gst_atomic_queue_unref(muxer
->output_queue
);
288 if (muxer
->container
)
289 gst_object_unref(muxer
->container
);
290 pthread_mutex_destroy(&muxer
->mutex
);
296 NTSTATUS
wg_muxer_destroy(void *args
)
298 struct wg_muxer
*muxer
= get_muxer(*(wg_muxer_t
*)args
);
299 struct wg_muxer_stream
*stream
, *next
;
302 LIST_FOR_EACH_ENTRY_SAFE(stream
, next
, &muxer
->streams
, struct wg_muxer_stream
, entry
)
304 list_remove(&stream
->entry
);
309 gst_buffer_unref(muxer
->buffer
);
311 while ((buffer
= gst_atomic_queue_pop(muxer
->output_queue
)))
312 gst_buffer_unref(buffer
);
313 gst_atomic_queue_unref(muxer
->output_queue
);
315 gst_object_unref(muxer
->my_sink
);
316 gst_caps_unref(muxer
->my_sink_caps
);
317 gst_element_set_state(muxer
->container
, GST_STATE_NULL
);
318 gst_object_unref(muxer
->container
);
320 pthread_mutex_destroy(&muxer
->mutex
);
327 NTSTATUS
wg_muxer_add_stream(void *args
)
329 struct wg_muxer_add_stream_params
*params
= args
;
330 struct wg_muxer
*muxer
= get_muxer(params
->muxer
);
331 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
332 GstPadTemplate
*template = NULL
;
333 struct wg_muxer_stream
*stream
;
334 char src_pad_name
[64];
336 GST_DEBUG("muxer %p, stream %u, format %p.", muxer
, params
->stream_id
, params
->format
);
338 /* Create stream object. */
339 if (!(stream
= calloc(1, sizeof(*stream
))))
340 return STATUS_NO_MEMORY
;
341 stream
->muxer
= muxer
;
342 stream
->format
= *params
->format
;
343 stream
->id
= params
->stream_id
;
345 /* Create stream my_src pad. */
346 if (!(stream
->my_src_caps
= wg_format_to_caps(params
->format
)))
348 if (!(template = gst_pad_template_new("src", GST_PAD_SRC
, GST_PAD_ALWAYS
, stream
->my_src_caps
)))
350 sprintf(src_pad_name
, "wg_muxer_stream_src_%u", stream
->id
);
351 if (!(stream
->my_src
= gst_pad_new_from_template(template, src_pad_name
)))
353 gst_pad_set_element_private(stream
->my_src
, stream
);
356 if ((stream
->parser
= find_element(GST_ELEMENT_FACTORY_TYPE_PARSER
, stream
->my_src_caps
, NULL
)))
360 if (!gst_bin_add(GST_BIN(muxer
->container
), stream
->parser
)
361 || !link_src_to_element(stream
->my_src
, stream
->parser
))
364 parser_src
= gst_element_get_static_pad(stream
->parser
, "src");
365 stream
->parser_src_caps
= gst_pad_query_caps(parser_src
, NULL
);
366 GST_INFO("Created parser %"GST_PTR_FORMAT
" for stream %u %p.",
367 stream
->parser
, stream
->id
, stream
);
368 gst_object_unref(parser_src
);
371 /* Add to muxer stream list. */
372 list_add_tail(&muxer
->streams
, &stream
->entry
);
374 gst_object_unref(template);
376 GST_INFO("Created winegstreamer muxer stream %p.", stream
);
378 return STATUS_SUCCESS
;
382 gst_object_unref(stream
->parser
);
384 gst_object_unref(stream
->my_src
);
386 gst_object_unref(template);
387 if (stream
->my_src_caps
)
388 gst_caps_unref(stream
->my_src_caps
);
394 NTSTATUS
wg_muxer_start(void *args
)
396 struct wg_muxer
*muxer
= get_muxer(*(wg_muxer_t
*)args
);
397 NTSTATUS status
= STATUS_UNSUCCESSFUL
;
398 struct wg_muxer_stream
*stream
;
400 GST_DEBUG("muxer %p.", muxer
);
402 /* Create muxer element. */
403 if (!(muxer
->muxer
= muxer_find_muxer(muxer
))
404 || !gst_bin_add(GST_BIN(muxer
->container
), muxer
->muxer
))
407 /* Link muxer element to my_sink */
408 if (!link_element_to_sink(muxer
->muxer
, muxer
->my_sink
)
409 || !gst_pad_set_active(muxer
->my_sink
, 1))
412 /* Link each stream to muxer element. */
413 LIST_FOR_EACH_ENTRY(stream
, &muxer
->streams
, struct wg_muxer_stream
, entry
)
415 bool link_ok
= stream
->parser
?
416 gst_element_link(stream
->parser
, muxer
->muxer
) :
417 link_src_to_element(stream
->my_src
, muxer
->muxer
);
423 /* Set to pause state. */
424 if (gst_element_set_state(muxer
->container
, GST_STATE_PAUSED
) == GST_STATE_CHANGE_FAILURE
425 || gst_element_get_state(muxer
->container
, NULL
, NULL
, -1) == GST_STATE_CHANGE_FAILURE
)
428 /* Active stream my_src pad and push events to prepare for streaming. */
429 LIST_FOR_EACH_ENTRY(stream
, &muxer
->streams
, struct wg_muxer_stream
, entry
)
433 sprintf(buffer
, "wg_muxer_stream_src_%u", stream
->id
);
434 gst_segment_init(&stream
->segment
, GST_FORMAT_BYTES
);
435 if (!gst_pad_set_active(stream
->my_src
, 1))
437 if (!push_event(stream
->my_src
, gst_event_new_stream_start(buffer
))
438 || !push_event(stream
->my_src
, gst_event_new_caps(stream
->my_src_caps
))
439 || !push_event(stream
->my_src
, gst_event_new_segment(&stream
->segment
)))
443 GST_DEBUG("Started muxer %p.", muxer
);
445 return STATUS_SUCCESS
;
448 NTSTATUS
wg_muxer_push_sample(void *args
)
450 struct wg_muxer_push_sample_params
*params
= args
;
451 struct wg_muxer
*muxer
= get_muxer(params
->muxer
);
452 struct wg_sample
*sample
= params
->sample
;
453 struct wg_muxer_stream
*stream
;
457 if (!(stream
= muxer_get_stream_by_id(muxer
, params
->stream_id
)))
458 return STATUS_NOT_FOUND
;
460 /* Create sample data buffer. */
461 if (!(buffer
= gst_buffer_new_and_alloc(sample
->size
))
462 || !gst_buffer_fill(buffer
, 0, wg_sample_data(sample
), sample
->size
))
464 GST_ERROR("Failed to allocate input buffer.");
465 return STATUS_NO_MEMORY
;
468 GST_INFO("Copied %u bytes from sample %p to buffer %p.", sample
->size
, sample
, buffer
);
470 /* Set sample properties. */
471 if (sample
->flags
& WG_SAMPLE_FLAG_HAS_PTS
)
472 GST_BUFFER_PTS(buffer
) = sample
->pts
* 100;
473 if (sample
->flags
& WG_SAMPLE_FLAG_HAS_DURATION
)
474 GST_BUFFER_DURATION(buffer
) = sample
->duration
* 100;
475 if (!(sample
->flags
& WG_SAMPLE_FLAG_SYNC_POINT
))
476 GST_BUFFER_FLAG_SET(buffer
, GST_BUFFER_FLAG_DELTA_UNIT
);
477 if (sample
->flags
& WG_SAMPLE_FLAG_DISCONTINUITY
)
478 GST_BUFFER_FLAG_SET(buffer
, GST_BUFFER_FLAG_DISCONT
);
480 /* Push sample data buffer to stream src pad. */
481 if ((ret
= gst_pad_push(stream
->my_src
, buffer
)) < 0)
483 GST_ERROR("Failed to push buffer %p to pad %s, reason %s.",
484 buffer
, gst_pad_get_name(stream
->my_src
), gst_flow_get_name(ret
));
485 return STATUS_UNSUCCESSFUL
;
488 return STATUS_SUCCESS
;
491 NTSTATUS
wg_muxer_read_data(void *args
)
493 struct wg_muxer_read_data_params
*params
= args
;
494 struct wg_muxer
*muxer
= get_muxer(params
->muxer
);
497 /* Pop buffer from output queue. */
500 if (!(muxer
->buffer
= gst_atomic_queue_pop(muxer
->output_queue
)))
501 return STATUS_NO_MEMORY
;
503 /* We may continuously read data from a same buffer multiple times.
504 * But we only need to set the offset at the first reading. */
505 if (GST_BUFFER_OFFSET_IS_VALID(muxer
->buffer
))
506 params
->offset
= GST_BUFFER_OFFSET(muxer
->buffer
);
510 size
= min(gst_buffer_get_size(muxer
->buffer
), params
->size
);
511 copied
= gst_buffer_extract(muxer
->buffer
, 0, params
->buffer
, size
);
512 params
->size
= copied
;
514 GST_INFO("Copied %"G_GSIZE_FORMAT
" bytes from buffer <%"GST_PTR_FORMAT
">", copied
, muxer
->buffer
);
516 /* Unref buffer if all data is read. */
517 gst_buffer_resize(muxer
->buffer
, (gssize
)copied
, -1);
518 if (!gst_buffer_get_size(muxer
->buffer
))
520 gst_buffer_unref(muxer
->buffer
);
521 muxer
->buffer
= NULL
;
524 return STATUS_SUCCESS
;