2 * Copyright (c) 2004 Roman Shaposhnik
3 * Copyright (c) 2008 Alexander Strange (astrange@ithinksw.com)
5 * Many thanks to Steven M. Schultz for providing clever ideas and
6 * to Michael Niedermayer <michaelni@gmx.at> for writing initial
9 * This file is part of Libav.
11 * Libav is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License as published by the Free Software Foundation; either
14 * version 2.1 of the License, or (at your option) any later version.
16 * Libav is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 * Lesser General Public License for more details.
21 * You should have received a copy of the GNU Lesser General Public
22 * License along with Libav; if not, write to the Free Software
23 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
28 * Multithreading support functions
29 * @see doc/multithreading.txt
34 #if HAVE_SCHED_GETAFFINITY
38 #if HAVE_GETPROCESSAFFINITYMASK
43 #include <sys/param.h>
45 #include <sys/types.h>
46 #include <sys/sysctl.h>
55 #include "libavutil/common.h"
60 #include "w32pthreads.h"
63 typedef int (action_func
)(AVCodecContext
*c
, void *arg
);
64 typedef int (action_func2
)(AVCodecContext
*c
, void *arg
, int jobnr
, int threadnr
);
66 typedef struct ThreadContext
{
76 pthread_cond_t last_job_cond
;
77 pthread_cond_t current_job_cond
;
78 pthread_mutex_t current_job_lock
;
83 /// Max number of frame buffers that can be allocated when using frame threads.
84 #define MAX_BUFFERS (32+1)
87 * Context used by codec threads and stored in their AVCodecContext thread_opaque.
89 typedef struct PerThreadContext
{
90 struct FrameThreadContext
*parent
;
94 pthread_cond_t input_cond
; ///< Used to wait for a new packet from the main thread.
95 pthread_cond_t progress_cond
; ///< Used by child threads to wait for progress to change.
96 pthread_cond_t output_cond
; ///< Used by the main thread to wait for frames to finish.
98 pthread_mutex_t mutex
; ///< Mutex used to protect the contents of the PerThreadContext.
99 pthread_mutex_t progress_mutex
; ///< Mutex used to protect frame progress values and progress_cond.
101 AVCodecContext
*avctx
; ///< Context used to decode packets passed to this thread.
103 AVPacket avpkt
; ///< Input packet (for decoding) or output (for encoding).
104 int allocated_buf_size
; ///< Size allocated for avpkt.data
106 AVFrame frame
; ///< Output frame (for decoding) or input (for encoding).
107 int got_frame
; ///< The output of got_picture_ptr from the last avcodec_decode_video() call.
108 int result
; ///< The result of the last codec decode/encode() call.
111 STATE_INPUT_READY
, ///< Set when the thread is awaiting a packet.
112 STATE_SETTING_UP
, ///< Set before the codec has called ff_thread_finish_setup().
113 STATE_GET_BUFFER
, /**<
114 * Set when the codec calls get_buffer().
115 * State is returned to STATE_SETTING_UP afterwards.
117 STATE_SETUP_FINISHED
///< Set after the codec has called ff_thread_finish_setup().
121 * Array of frames passed to ff_thread_release_buffer().
122 * Frames are released after all threads referencing them are finished.
124 AVFrame released_buffers
[MAX_BUFFERS
];
125 int num_released_buffers
;
128 * Array of progress values used by ff_thread_get_buffer().
130 int progress
[MAX_BUFFERS
][2];
131 uint8_t progress_used
[MAX_BUFFERS
];
133 AVFrame
*requested_frame
; ///< AVFrame the codec passed to get_buffer()
137 * Context stored in the client AVCodecContext thread_opaque.
139 typedef struct FrameThreadContext
{
140 PerThreadContext
*threads
; ///< The contexts for each thread.
141 PerThreadContext
*prev_thread
; ///< The last thread submit_packet() was called on.
143 pthread_mutex_t buffer_mutex
; ///< Mutex used to protect get/release_buffer().
145 int next_decoding
; ///< The next context to submit a packet to.
146 int next_finished
; ///< The next context to return output from.
149 * Set for the first N packets, where N is the number of threads.
150 * While it is set, ff_thread_en/decode_frame won't return any results.
153 int die
; ///< Set when threads should exit.
154 } FrameThreadContext
;
157 /* H264 slice threading seems to be buggy with more than 16 threads,
158 * limit the number of threads to 16 for automatic detection */
159 #define MAX_AUTO_THREADS 16
161 static int get_logical_cpus(AVCodecContext
*avctx
)
163 int ret
, nb_cpus
= 1;
164 #if HAVE_SCHED_GETAFFINITY && defined(CPU_COUNT)
169 ret
= sched_getaffinity(0, sizeof(cpuset
), &cpuset
);
171 nb_cpus
= CPU_COUNT(&cpuset
);
173 #elif HAVE_GETPROCESSAFFINITYMASK
174 DWORD_PTR proc_aff
, sys_aff
;
175 ret
= GetProcessAffinityMask(GetCurrentProcess(), &proc_aff
, &sys_aff
);
177 nb_cpus
= av_popcount64(proc_aff
);
178 #elif HAVE_SYSCTL && defined(HW_NCPU)
179 int mib
[2] = { CTL_HW
, HW_NCPU
};
180 size_t len
= sizeof(nb_cpus
);
182 ret
= sysctl(mib
, 2, &nb_cpus
, &len
, NULL
, 0);
185 #elif HAVE_SYSCONF && defined(_SC_NPROC_ONLN)
186 nb_cpus
= sysconf(_SC_NPROC_ONLN
);
187 #elif HAVE_SYSCONF && defined(_SC_NPROCESSORS_ONLN)
188 nb_cpus
= sysconf(_SC_NPROCESSORS_ONLN
);
190 av_log(avctx
, AV_LOG_DEBUG
, "detected %d logical cores\n", nb_cpus
);
195 static void* attribute_align_arg
worker(void *v
)
197 AVCodecContext
*avctx
= v
;
198 ThreadContext
*c
= avctx
->thread_opaque
;
199 int our_job
= c
->job_count
;
200 int thread_count
= avctx
->thread_count
;
203 pthread_mutex_lock(&c
->current_job_lock
);
204 self_id
= c
->current_job
++;
206 while (our_job
>= c
->job_count
) {
207 if (c
->current_job
== thread_count
+ c
->job_count
)
208 pthread_cond_signal(&c
->last_job_cond
);
210 pthread_cond_wait(&c
->current_job_cond
, &c
->current_job_lock
);
214 pthread_mutex_unlock(&c
->current_job_lock
);
218 pthread_mutex_unlock(&c
->current_job_lock
);
220 c
->rets
[our_job
%c
->rets_count
] = c
->func
? c
->func(avctx
, (char*)c
->args
+ our_job
*c
->job_size
):
221 c
->func2(avctx
, c
->args
, our_job
, self_id
);
223 pthread_mutex_lock(&c
->current_job_lock
);
224 our_job
= c
->current_job
++;
228 static av_always_inline
void avcodec_thread_park_workers(ThreadContext
*c
, int thread_count
)
230 pthread_cond_wait(&c
->last_job_cond
, &c
->current_job_lock
);
231 pthread_mutex_unlock(&c
->current_job_lock
);
234 static void thread_free(AVCodecContext
*avctx
)
236 ThreadContext
*c
= avctx
->thread_opaque
;
239 pthread_mutex_lock(&c
->current_job_lock
);
241 pthread_cond_broadcast(&c
->current_job_cond
);
242 pthread_mutex_unlock(&c
->current_job_lock
);
244 for (i
=0; i
<avctx
->thread_count
; i
++)
245 pthread_join(c
->workers
[i
], NULL
);
247 pthread_mutex_destroy(&c
->current_job_lock
);
248 pthread_cond_destroy(&c
->current_job_cond
);
249 pthread_cond_destroy(&c
->last_job_cond
);
251 av_freep(&avctx
->thread_opaque
);
254 static int avcodec_thread_execute(AVCodecContext
*avctx
, action_func
* func
, void *arg
, int *ret
, int job_count
, int job_size
)
256 ThreadContext
*c
= avctx
->thread_opaque
;
259 if (!(avctx
->active_thread_type
&FF_THREAD_SLICE
) || avctx
->thread_count
<= 1)
260 return avcodec_default_execute(avctx
, func
, arg
, ret
, job_count
, job_size
);
265 pthread_mutex_lock(&c
->current_job_lock
);
267 c
->current_job
= avctx
->thread_count
;
268 c
->job_count
= job_count
;
269 c
->job_size
= job_size
;
274 c
->rets_count
= job_count
;
276 c
->rets
= &dummy_ret
;
279 pthread_cond_broadcast(&c
->current_job_cond
);
281 avcodec_thread_park_workers(c
, avctx
->thread_count
);
286 static int avcodec_thread_execute2(AVCodecContext
*avctx
, action_func2
* func2
, void *arg
, int *ret
, int job_count
)
288 ThreadContext
*c
= avctx
->thread_opaque
;
290 return avcodec_thread_execute(avctx
, NULL
, arg
, ret
, job_count
, 0);
293 static int thread_init(AVCodecContext
*avctx
)
297 int thread_count
= avctx
->thread_count
;
300 int nb_cpus
= get_logical_cpus(avctx
);
301 // use number of cores + 1 as thread count if there is more than one
303 thread_count
= avctx
->thread_count
= FFMIN(nb_cpus
+ 1, MAX_AUTO_THREADS
);
305 thread_count
= avctx
->thread_count
= 1;
308 if (thread_count
<= 1) {
309 avctx
->active_thread_type
= 0;
313 c
= av_mallocz(sizeof(ThreadContext
));
317 c
->workers
= av_mallocz(sizeof(pthread_t
)*thread_count
);
323 avctx
->thread_opaque
= c
;
328 pthread_cond_init(&c
->current_job_cond
, NULL
);
329 pthread_cond_init(&c
->last_job_cond
, NULL
);
330 pthread_mutex_init(&c
->current_job_lock
, NULL
);
331 pthread_mutex_lock(&c
->current_job_lock
);
332 for (i
=0; i
<thread_count
; i
++) {
333 if(pthread_create(&c
->workers
[i
], NULL
, worker
, avctx
)) {
334 avctx
->thread_count
= i
;
335 pthread_mutex_unlock(&c
->current_job_lock
);
336 ff_thread_free(avctx
);
341 avcodec_thread_park_workers(c
, thread_count
);
343 avctx
->execute
= avcodec_thread_execute
;
344 avctx
->execute2
= avcodec_thread_execute2
;
349 * Codec worker thread.
351 * Automatically calls ff_thread_finish_setup() if the codec does
352 * not provide an update_thread_context method, or if the codec returns
355 static attribute_align_arg
void *frame_worker_thread(void *arg
)
357 PerThreadContext
*p
= arg
;
358 FrameThreadContext
*fctx
= p
->parent
;
359 AVCodecContext
*avctx
= p
->avctx
;
360 const AVCodec
*codec
= avctx
->codec
;
363 if (p
->state
== STATE_INPUT_READY
&& !fctx
->die
) {
364 pthread_mutex_lock(&p
->mutex
);
365 while (p
->state
== STATE_INPUT_READY
&& !fctx
->die
)
366 pthread_cond_wait(&p
->input_cond
, &p
->mutex
);
367 pthread_mutex_unlock(&p
->mutex
);
370 if (fctx
->die
) break;
372 if (!codec
->update_thread_context
&& avctx
->thread_safe_callbacks
)
373 ff_thread_finish_setup(avctx
);
375 pthread_mutex_lock(&p
->mutex
);
376 avcodec_get_frame_defaults(&p
->frame
);
378 p
->result
= codec
->decode(avctx
, &p
->frame
, &p
->got_frame
, &p
->avpkt
);
380 /* many decoders assign whole AVFrames, thus overwriting extended_data;
381 * make sure it's set correctly */
382 p
->frame
.extended_data
= p
->frame
.data
;
384 if (p
->state
== STATE_SETTING_UP
) ff_thread_finish_setup(avctx
);
386 p
->state
= STATE_INPUT_READY
;
388 pthread_mutex_lock(&p
->progress_mutex
);
389 pthread_cond_signal(&p
->output_cond
);
390 pthread_mutex_unlock(&p
->progress_mutex
);
392 pthread_mutex_unlock(&p
->mutex
);
399 * Update the next thread's AVCodecContext with values from the reference thread's context.
401 * @param dst The destination context.
402 * @param src The source context.
403 * @param for_user 0 if the destination is a codec thread, 1 if the destination is the user's thread
405 static int update_context_from_thread(AVCodecContext
*dst
, AVCodecContext
*src
, int for_user
)
410 dst
->time_base
= src
->time_base
;
411 dst
->width
= src
->width
;
412 dst
->height
= src
->height
;
413 dst
->pix_fmt
= src
->pix_fmt
;
415 dst
->coded_width
= src
->coded_width
;
416 dst
->coded_height
= src
->coded_height
;
418 dst
->has_b_frames
= src
->has_b_frames
;
419 dst
->idct_algo
= src
->idct_algo
;
421 dst
->bits_per_coded_sample
= src
->bits_per_coded_sample
;
422 dst
->sample_aspect_ratio
= src
->sample_aspect_ratio
;
423 dst
->dtg_active_format
= src
->dtg_active_format
;
425 dst
->profile
= src
->profile
;
426 dst
->level
= src
->level
;
428 dst
->bits_per_raw_sample
= src
->bits_per_raw_sample
;
429 dst
->ticks_per_frame
= src
->ticks_per_frame
;
430 dst
->color_primaries
= src
->color_primaries
;
432 dst
->color_trc
= src
->color_trc
;
433 dst
->colorspace
= src
->colorspace
;
434 dst
->color_range
= src
->color_range
;
435 dst
->chroma_sample_location
= src
->chroma_sample_location
;
439 dst
->coded_frame
= src
->coded_frame
;
441 if (dst
->codec
->update_thread_context
)
442 err
= dst
->codec
->update_thread_context(dst
, src
);
449 * Update the next thread's AVCodecContext with values set by the user.
451 * @param dst The destination context.
452 * @param src The source context.
453 * @return 0 on success, negative error code on failure
455 static int update_context_from_user(AVCodecContext
*dst
, AVCodecContext
*src
)
457 #define copy_fields(s, e) memcpy(&dst->s, &src->s, (char*)&dst->e - (char*)&dst->s);
458 dst
->flags
= src
->flags
;
460 dst
->draw_horiz_band
= src
->draw_horiz_band
;
461 dst
->get_buffer
= src
->get_buffer
;
462 dst
->release_buffer
= src
->release_buffer
;
464 dst
->opaque
= src
->opaque
;
465 dst
->debug
= src
->debug
;
466 dst
->debug_mv
= src
->debug_mv
;
468 dst
->slice_flags
= src
->slice_flags
;
469 dst
->flags2
= src
->flags2
;
471 copy_fields(skip_loop_filter
, subtitle_header
);
473 dst
->frame_number
= src
->frame_number
;
474 dst
->reordered_opaque
= src
->reordered_opaque
;
476 if (src
->slice_count
&& src
->slice_offset
) {
477 if (dst
->slice_count
< src
->slice_count
) {
478 int *tmp
= av_realloc(dst
->slice_offset
, src
->slice_count
*
479 sizeof(*dst
->slice_offset
));
481 av_free(dst
->slice_offset
);
482 return AVERROR(ENOMEM
);
484 dst
->slice_offset
= tmp
;
486 memcpy(dst
->slice_offset
, src
->slice_offset
,
487 src
->slice_count
* sizeof(*dst
->slice_offset
));
489 dst
->slice_count
= src
->slice_count
;
494 static void free_progress(AVFrame
*f
)
496 PerThreadContext
*p
= f
->owner
->thread_opaque
;
497 int *progress
= f
->thread_opaque
;
499 p
->progress_used
[(progress
- p
->progress
[0]) / 2] = 0;
502 /// Releases the buffers that this decoding thread was the last user of.
503 static void release_delayed_buffers(PerThreadContext
*p
)
505 FrameThreadContext
*fctx
= p
->parent
;
507 while (p
->num_released_buffers
> 0) {
510 pthread_mutex_lock(&fctx
->buffer_mutex
);
511 f
= &p
->released_buffers
[--p
->num_released_buffers
];
513 f
->thread_opaque
= NULL
;
515 f
->owner
->release_buffer(f
->owner
, f
);
516 pthread_mutex_unlock(&fctx
->buffer_mutex
);
520 static int submit_packet(PerThreadContext
*p
, AVPacket
*avpkt
)
522 FrameThreadContext
*fctx
= p
->parent
;
523 PerThreadContext
*prev_thread
= fctx
->prev_thread
;
524 const AVCodec
*codec
= p
->avctx
->codec
;
525 uint8_t *buf
= p
->avpkt
.data
;
527 if (!avpkt
->size
&& !(codec
->capabilities
& CODEC_CAP_DELAY
)) return 0;
529 pthread_mutex_lock(&p
->mutex
);
531 release_delayed_buffers(p
);
535 if (prev_thread
->state
== STATE_SETTING_UP
) {
536 pthread_mutex_lock(&prev_thread
->progress_mutex
);
537 while (prev_thread
->state
== STATE_SETTING_UP
)
538 pthread_cond_wait(&prev_thread
->progress_cond
, &prev_thread
->progress_mutex
);
539 pthread_mutex_unlock(&prev_thread
->progress_mutex
);
542 err
= update_context_from_thread(p
->avctx
, prev_thread
->avctx
, 0);
544 pthread_mutex_unlock(&p
->mutex
);
549 av_fast_malloc(&buf
, &p
->allocated_buf_size
, avpkt
->size
+ FF_INPUT_BUFFER_PADDING_SIZE
);
552 memcpy(buf
, avpkt
->data
, avpkt
->size
);
553 memset(buf
+ avpkt
->size
, 0, FF_INPUT_BUFFER_PADDING_SIZE
);
555 p
->state
= STATE_SETTING_UP
;
556 pthread_cond_signal(&p
->input_cond
);
557 pthread_mutex_unlock(&p
->mutex
);
560 * If the client doesn't have a thread-safe get_buffer(),
561 * then decoding threads call back to the main thread,
562 * and it calls back to the client here.
565 if (!p
->avctx
->thread_safe_callbacks
&&
566 p
->avctx
->get_buffer
!= avcodec_default_get_buffer
) {
567 while (p
->state
!= STATE_SETUP_FINISHED
&& p
->state
!= STATE_INPUT_READY
) {
568 pthread_mutex_lock(&p
->progress_mutex
);
569 while (p
->state
== STATE_SETTING_UP
)
570 pthread_cond_wait(&p
->progress_cond
, &p
->progress_mutex
);
572 if (p
->state
== STATE_GET_BUFFER
) {
573 p
->result
= ff_get_buffer(p
->avctx
, p
->requested_frame
);
574 p
->state
= STATE_SETTING_UP
;
575 pthread_cond_signal(&p
->progress_cond
);
577 pthread_mutex_unlock(&p
->progress_mutex
);
581 fctx
->prev_thread
= p
;
582 fctx
->next_decoding
++;
587 int ff_thread_decode_frame(AVCodecContext
*avctx
,
588 AVFrame
*picture
, int *got_picture_ptr
,
591 FrameThreadContext
*fctx
= avctx
->thread_opaque
;
592 int finished
= fctx
->next_finished
;
597 * Submit a packet to the next decoding thread.
600 p
= &fctx
->threads
[fctx
->next_decoding
];
601 err
= update_context_from_user(p
->avctx
, avctx
);
603 err
= submit_packet(p
, avpkt
);
607 * If we're still receiving the initial packets, don't return a frame.
610 if (fctx
->delaying
) {
611 if (fctx
->next_decoding
>= (avctx
->thread_count
-1)) fctx
->delaying
= 0;
619 * Return the next available frame from the oldest thread.
620 * If we're at the end of the stream, then we have to skip threads that
621 * didn't output a frame, because we don't want to accidentally signal
622 * EOF (avpkt->size == 0 && *got_picture_ptr == 0).
626 p
= &fctx
->threads
[finished
++];
628 if (p
->state
!= STATE_INPUT_READY
) {
629 pthread_mutex_lock(&p
->progress_mutex
);
630 while (p
->state
!= STATE_INPUT_READY
)
631 pthread_cond_wait(&p
->output_cond
, &p
->progress_mutex
);
632 pthread_mutex_unlock(&p
->progress_mutex
);
636 *got_picture_ptr
= p
->got_frame
;
637 picture
->pkt_dts
= p
->avpkt
.dts
;
640 * A later call with avkpt->size == 0 may loop over all threads,
641 * including this one, searching for a frame to return before being
642 * stopped by the "finished != fctx->next_finished" condition.
643 * Make sure we don't mistakenly return the same frame again.
647 if (finished
>= avctx
->thread_count
) finished
= 0;
648 } while (!avpkt
->size
&& !*got_picture_ptr
&& finished
!= fctx
->next_finished
);
650 update_context_from_thread(avctx
, p
->avctx
, 1);
652 if (fctx
->next_decoding
>= avctx
->thread_count
) fctx
->next_decoding
= 0;
654 fctx
->next_finished
= finished
;
656 /* return the size of the consumed packet if no error occurred */
657 return (p
->result
>= 0) ? avpkt
->size
: p
->result
;
660 void ff_thread_report_progress(AVFrame
*f
, int n
, int field
)
663 int *progress
= f
->thread_opaque
;
665 if (!progress
|| progress
[field
] >= n
) return;
667 p
= f
->owner
->thread_opaque
;
669 if (f
->owner
->debug
&FF_DEBUG_THREADS
)
670 av_log(f
->owner
, AV_LOG_DEBUG
, "%p finished %d field %d\n", progress
, n
, field
);
672 pthread_mutex_lock(&p
->progress_mutex
);
674 pthread_cond_broadcast(&p
->progress_cond
);
675 pthread_mutex_unlock(&p
->progress_mutex
);
678 void ff_thread_await_progress(AVFrame
*f
, int n
, int field
)
681 int *progress
= f
->thread_opaque
;
683 if (!progress
|| progress
[field
] >= n
) return;
685 p
= f
->owner
->thread_opaque
;
687 if (f
->owner
->debug
&FF_DEBUG_THREADS
)
688 av_log(f
->owner
, AV_LOG_DEBUG
, "thread awaiting %d field %d from %p\n", n
, field
, progress
);
690 pthread_mutex_lock(&p
->progress_mutex
);
691 while (progress
[field
] < n
)
692 pthread_cond_wait(&p
->progress_cond
, &p
->progress_mutex
);
693 pthread_mutex_unlock(&p
->progress_mutex
);
696 void ff_thread_finish_setup(AVCodecContext
*avctx
) {
697 PerThreadContext
*p
= avctx
->thread_opaque
;
699 if (!(avctx
->active_thread_type
&FF_THREAD_FRAME
)) return;
701 pthread_mutex_lock(&p
->progress_mutex
);
702 p
->state
= STATE_SETUP_FINISHED
;
703 pthread_cond_broadcast(&p
->progress_cond
);
704 pthread_mutex_unlock(&p
->progress_mutex
);
707 /// Waits for all threads to finish.
708 static void park_frame_worker_threads(FrameThreadContext
*fctx
, int thread_count
)
712 for (i
= 0; i
< thread_count
; i
++) {
713 PerThreadContext
*p
= &fctx
->threads
[i
];
715 if (p
->state
!= STATE_INPUT_READY
) {
716 pthread_mutex_lock(&p
->progress_mutex
);
717 while (p
->state
!= STATE_INPUT_READY
)
718 pthread_cond_wait(&p
->output_cond
, &p
->progress_mutex
);
719 pthread_mutex_unlock(&p
->progress_mutex
);
724 static void frame_thread_free(AVCodecContext
*avctx
, int thread_count
)
726 FrameThreadContext
*fctx
= avctx
->thread_opaque
;
727 const AVCodec
*codec
= avctx
->codec
;
730 park_frame_worker_threads(fctx
, thread_count
);
732 if (fctx
->prev_thread
&& fctx
->prev_thread
!= fctx
->threads
)
733 update_context_from_thread(fctx
->threads
->avctx
, fctx
->prev_thread
->avctx
, 0);
737 for (i
= 0; i
< thread_count
; i
++) {
738 PerThreadContext
*p
= &fctx
->threads
[i
];
740 pthread_mutex_lock(&p
->mutex
);
741 pthread_cond_signal(&p
->input_cond
);
742 pthread_mutex_unlock(&p
->mutex
);
745 pthread_join(p
->thread
, NULL
);
748 codec
->close(p
->avctx
);
752 release_delayed_buffers(p
);
755 for (i
= 0; i
< thread_count
; i
++) {
756 PerThreadContext
*p
= &fctx
->threads
[i
];
758 avcodec_default_free_buffers(p
->avctx
);
760 pthread_mutex_destroy(&p
->mutex
);
761 pthread_mutex_destroy(&p
->progress_mutex
);
762 pthread_cond_destroy(&p
->input_cond
);
763 pthread_cond_destroy(&p
->progress_cond
);
764 pthread_cond_destroy(&p
->output_cond
);
765 av_freep(&p
->avpkt
.data
);
768 av_freep(&p
->avctx
->priv_data
);
769 av_freep(&p
->avctx
->internal
);
770 av_freep(&p
->avctx
->slice_offset
);
776 av_freep(&fctx
->threads
);
777 pthread_mutex_destroy(&fctx
->buffer_mutex
);
778 av_freep(&avctx
->thread_opaque
);
781 static int frame_thread_init(AVCodecContext
*avctx
)
783 int thread_count
= avctx
->thread_count
;
784 const AVCodec
*codec
= avctx
->codec
;
785 AVCodecContext
*src
= avctx
;
786 FrameThreadContext
*fctx
;
790 int nb_cpus
= get_logical_cpus(avctx
);
791 // use number of cores + 1 as thread count if there is more than one
793 thread_count
= avctx
->thread_count
= FFMIN(nb_cpus
+ 1, MAX_AUTO_THREADS
);
795 thread_count
= avctx
->thread_count
= 1;
798 if (thread_count
<= 1) {
799 avctx
->active_thread_type
= 0;
803 avctx
->thread_opaque
= fctx
= av_mallocz(sizeof(FrameThreadContext
));
805 fctx
->threads
= av_mallocz(sizeof(PerThreadContext
) * thread_count
);
806 pthread_mutex_init(&fctx
->buffer_mutex
, NULL
);
809 for (i
= 0; i
< thread_count
; i
++) {
810 AVCodecContext
*copy
= av_malloc(sizeof(AVCodecContext
));
811 PerThreadContext
*p
= &fctx
->threads
[i
];
813 pthread_mutex_init(&p
->mutex
, NULL
);
814 pthread_mutex_init(&p
->progress_mutex
, NULL
);
815 pthread_cond_init(&p
->input_cond
, NULL
);
816 pthread_cond_init(&p
->progress_cond
, NULL
);
817 pthread_cond_init(&p
->output_cond
, NULL
);
823 err
= AVERROR(ENOMEM
);
828 copy
->thread_opaque
= p
;
829 copy
->pkt
= &p
->avpkt
;
835 err
= codec
->init(copy
);
837 update_context_from_thread(avctx
, copy
, 1);
839 copy
->priv_data
= av_malloc(codec
->priv_data_size
);
840 if (!copy
->priv_data
) {
841 err
= AVERROR(ENOMEM
);
844 memcpy(copy
->priv_data
, src
->priv_data
, codec
->priv_data_size
);
845 copy
->internal
= av_malloc(sizeof(AVCodecInternal
));
846 if (!copy
->internal
) {
847 err
= AVERROR(ENOMEM
);
850 *copy
->internal
= *src
->internal
;
851 copy
->internal
->is_copy
= 1;
853 if (codec
->init_thread_copy
)
854 err
= codec
->init_thread_copy(copy
);
859 if (!pthread_create(&p
->thread
, NULL
, frame_worker_thread
, p
))
866 frame_thread_free(avctx
, i
+1);
871 void ff_thread_flush(AVCodecContext
*avctx
)
874 FrameThreadContext
*fctx
= avctx
->thread_opaque
;
876 if (!avctx
->thread_opaque
) return;
878 park_frame_worker_threads(fctx
, avctx
->thread_count
);
879 if (fctx
->prev_thread
) {
880 if (fctx
->prev_thread
!= &fctx
->threads
[0])
881 update_context_from_thread(fctx
->threads
[0].avctx
, fctx
->prev_thread
->avctx
, 0);
882 if (avctx
->codec
->flush
)
883 avctx
->codec
->flush(fctx
->threads
[0].avctx
);
886 fctx
->next_decoding
= fctx
->next_finished
= 0;
888 fctx
->prev_thread
= NULL
;
889 for (i
= 0; i
< avctx
->thread_count
; i
++) {
890 PerThreadContext
*p
= &fctx
->threads
[i
];
891 // Make sure decode flush calls with size=0 won't return old frames
894 release_delayed_buffers(p
);
898 static int *allocate_progress(PerThreadContext
*p
)
902 for (i
= 0; i
< MAX_BUFFERS
; i
++)
903 if (!p
->progress_used
[i
]) break;
905 if (i
== MAX_BUFFERS
) {
906 av_log(p
->avctx
, AV_LOG_ERROR
, "allocate_progress() overflow\n");
910 p
->progress_used
[i
] = 1;
912 return p
->progress
[i
];
915 int ff_thread_get_buffer(AVCodecContext
*avctx
, AVFrame
*f
)
917 PerThreadContext
*p
= avctx
->thread_opaque
;
922 if (!(avctx
->active_thread_type
&FF_THREAD_FRAME
)) {
923 f
->thread_opaque
= NULL
;
924 return ff_get_buffer(avctx
, f
);
927 if (p
->state
!= STATE_SETTING_UP
&&
928 (avctx
->codec
->update_thread_context
|| !avctx
->thread_safe_callbacks
)) {
929 av_log(avctx
, AV_LOG_ERROR
, "get_buffer() cannot be called after ff_thread_finish_setup()\n");
933 pthread_mutex_lock(&p
->parent
->buffer_mutex
);
934 f
->thread_opaque
= progress
= allocate_progress(p
);
937 pthread_mutex_unlock(&p
->parent
->buffer_mutex
);
944 if (avctx
->thread_safe_callbacks
||
945 avctx
->get_buffer
== avcodec_default_get_buffer
) {
946 err
= ff_get_buffer(avctx
, f
);
948 p
->requested_frame
= f
;
949 p
->state
= STATE_GET_BUFFER
;
950 pthread_mutex_lock(&p
->progress_mutex
);
951 pthread_cond_signal(&p
->progress_cond
);
953 while (p
->state
!= STATE_SETTING_UP
)
954 pthread_cond_wait(&p
->progress_cond
, &p
->progress_mutex
);
958 pthread_mutex_unlock(&p
->progress_mutex
);
960 if (!avctx
->codec
->update_thread_context
)
961 ff_thread_finish_setup(avctx
);
966 f
->thread_opaque
= NULL
;
968 pthread_mutex_unlock(&p
->parent
->buffer_mutex
);
973 void ff_thread_release_buffer(AVCodecContext
*avctx
, AVFrame
*f
)
975 PerThreadContext
*p
= avctx
->thread_opaque
;
976 FrameThreadContext
*fctx
;
981 if (!(avctx
->active_thread_type
&FF_THREAD_FRAME
)) {
982 avctx
->release_buffer(avctx
, f
);
986 if (p
->num_released_buffers
>= MAX_BUFFERS
) {
987 av_log(p
->avctx
, AV_LOG_ERROR
, "too many thread_release_buffer calls!\n");
991 if(avctx
->debug
& FF_DEBUG_BUFFERS
)
992 av_log(avctx
, AV_LOG_DEBUG
, "thread_release_buffer called on pic %p\n", f
);
995 pthread_mutex_lock(&fctx
->buffer_mutex
);
996 p
->released_buffers
[p
->num_released_buffers
++] = *f
;
997 pthread_mutex_unlock(&fctx
->buffer_mutex
);
998 memset(f
->data
, 0, sizeof(f
->data
));
1002 * Set the threading algorithms used.
1004 * Threading requires more than one thread.
1005 * Frame threading requires entire frames to be passed to the codec,
1006 * and introduces extra decoding delay, so is incompatible with low_delay.
1008 * @param avctx The context.
1010 static void validate_thread_parameters(AVCodecContext
*avctx
)
1012 int frame_threading_supported
= (avctx
->codec
->capabilities
& CODEC_CAP_FRAME_THREADS
)
1013 && !(avctx
->flags
& CODEC_FLAG_TRUNCATED
)
1014 && !(avctx
->flags
& CODEC_FLAG_LOW_DELAY
)
1015 && !(avctx
->flags2
& CODEC_FLAG2_CHUNKS
);
1016 if (avctx
->thread_count
== 1) {
1017 avctx
->active_thread_type
= 0;
1018 } else if (frame_threading_supported
&& (avctx
->thread_type
& FF_THREAD_FRAME
)) {
1019 avctx
->active_thread_type
= FF_THREAD_FRAME
;
1020 } else if (avctx
->codec
->capabilities
& CODEC_CAP_SLICE_THREADS
&&
1021 avctx
->thread_type
& FF_THREAD_SLICE
) {
1022 avctx
->active_thread_type
= FF_THREAD_SLICE
;
1023 } else if (!(avctx
->codec
->capabilities
& CODEC_CAP_AUTO_THREADS
)) {
1024 avctx
->thread_count
= 1;
1025 avctx
->active_thread_type
= 0;
1028 if (avctx
->thread_count
> MAX_AUTO_THREADS
)
1029 av_log(avctx
, AV_LOG_WARNING
,
1030 "Application has requested %d threads. Using a thread count greater than %d is not recommended.\n",
1031 avctx
->thread_count
, MAX_AUTO_THREADS
);
1034 int ff_thread_init(AVCodecContext
*avctx
)
1036 if (avctx
->thread_opaque
) {
1037 av_log(avctx
, AV_LOG_ERROR
, "avcodec_thread_init is ignored after avcodec_open\n");
1046 validate_thread_parameters(avctx
);
1048 if (avctx
->active_thread_type
&FF_THREAD_SLICE
)
1049 return thread_init(avctx
);
1050 else if (avctx
->active_thread_type
&FF_THREAD_FRAME
)
1051 return frame_thread_init(avctx
);
1057 void ff_thread_free(AVCodecContext
*avctx
)
1059 if (avctx
->active_thread_type
&FF_THREAD_FRAME
)
1060 frame_thread_free(avctx
, avctx
->thread_count
);