2 * Copyright © 2018, VideoLAN and dav1d authors
3 * Copyright © 2018, Two Orioles, LLC
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * 1. Redistributions of source code must retain the above copyright notice, this
10 * list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright notice,
13 * this list of conditions and the following disclaimer in the documentation
14 * and/or other materials provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
20 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 #include "common/frame.h"
32 #include "src/thread_task.h"
33 #include "src/fg_apply.h"
35 // This function resets the cur pointer to the first frame theoretically
36 // executable after a task completed (ie. each time we update some progress or
37 // insert some tasks in the queue).
38 // When frame_idx is set, it can be either from a completed task, or from tasks
39 // inserted in the queue, in which case we have to make sure the cur pointer
40 // isn't past this insert.
41 // The special case where frame_idx is UINT_MAX is to handle the reset after
42 // completing a task and locklessly signaling progress. In this case we don't
43 // enter a critical section, which is needed for this function, so we set an
44 // atomic for a delayed handling, happening here. Meaning we can call this
45 // function without any actual update other than what's in the atomic, hence
47 static inline int reset_task_cur(const Dav1dContext
*const c
,
48 struct TaskThreadData
*const ttd
,
51 const unsigned first
= atomic_load(&ttd
->first
);
52 unsigned reset_frame_idx
= atomic_exchange(&ttd
->reset_task_cur
, UINT_MAX
);
53 if (reset_frame_idx
< first
) {
54 if (frame_idx
== UINT_MAX
) return 0;
55 reset_frame_idx
= UINT_MAX
;
57 if (!ttd
->cur
&& c
->fc
[first
].task_thread
.task_cur_prev
== NULL
)
59 if (reset_frame_idx
!= UINT_MAX
) {
60 if (frame_idx
== UINT_MAX
) {
61 if (reset_frame_idx
> first
+ ttd
->cur
)
63 ttd
->cur
= reset_frame_idx
- first
;
66 } else if (frame_idx
== UINT_MAX
)
68 if (frame_idx
< first
) frame_idx
+= c
->n_fc
;
69 const unsigned min_frame_idx
= umin(reset_frame_idx
, frame_idx
);
70 const unsigned cur_frame_idx
= first
+ ttd
->cur
;
71 if (ttd
->cur
< c
->n_fc
&& cur_frame_idx
< min_frame_idx
)
73 for (ttd
->cur
= min_frame_idx
- first
; ttd
->cur
< c
->n_fc
; ttd
->cur
++)
74 if (c
->fc
[(first
+ ttd
->cur
) % c
->n_fc
].task_thread
.task_head
)
77 for (unsigned i
= ttd
->cur
; i
< c
->n_fc
; i
++)
78 c
->fc
[(first
+ i
) % c
->n_fc
].task_thread
.task_cur_prev
= NULL
;
82 static inline void reset_task_cur_async(struct TaskThreadData
*const ttd
,
83 unsigned frame_idx
, unsigned n_frames
)
85 const unsigned first
= atomic_load(&ttd
->first
);
86 if (frame_idx
< first
) frame_idx
+= n_frames
;
87 unsigned last_idx
= frame_idx
;
90 last_idx
= atomic_exchange(&ttd
->reset_task_cur
, frame_idx
);
91 } while (last_idx
< frame_idx
);
92 if (frame_idx
== first
&& atomic_load(&ttd
->first
) != first
) {
93 unsigned expected
= frame_idx
;
94 atomic_compare_exchange_strong(&ttd
->reset_task_cur
, &expected
, UINT_MAX
);
98 static void insert_tasks_between(Dav1dFrameContext
*const f
,
99 Dav1dTask
*const first
, Dav1dTask
*const last
,
100 Dav1dTask
*const a
, Dav1dTask
*const b
,
101 const int cond_signal
)
103 struct TaskThreadData
*const ttd
= f
->task_thread
.ttd
;
104 if (atomic_load(f
->c
->flush
)) return;
105 assert(!a
|| a
->next
== b
);
106 if (!a
) f
->task_thread
.task_head
= first
;
107 else a
->next
= first
;
108 if (!b
) f
->task_thread
.task_tail
= last
;
110 reset_task_cur(f
->c
, ttd
, first
->frame_idx
);
111 if (cond_signal
&& !atomic_fetch_or(&ttd
->cond_signaled
, 1))
112 pthread_cond_signal(&ttd
->cond
);
115 static void insert_tasks(Dav1dFrameContext
*const f
,
116 Dav1dTask
*const first
, Dav1dTask
*const last
,
117 const int cond_signal
)
119 // insert task back into task queue
120 Dav1dTask
*t_ptr
, *prev_t
= NULL
;
121 for (t_ptr
= f
->task_thread
.task_head
;
122 t_ptr
; prev_t
= t_ptr
, t_ptr
= t_ptr
->next
)
124 // entropy coding precedes other steps
125 if (t_ptr
->type
== DAV1D_TASK_TYPE_TILE_ENTROPY
) {
126 if (first
->type
> DAV1D_TASK_TYPE_TILE_ENTROPY
) continue;
128 if (first
->sby
> t_ptr
->sby
) continue;
129 if (first
->sby
< t_ptr
->sby
) {
130 insert_tasks_between(f
, first
, last
, prev_t
, t_ptr
, cond_signal
);
135 if (first
->type
== DAV1D_TASK_TYPE_TILE_ENTROPY
) {
136 insert_tasks_between(f
, first
, last
, prev_t
, t_ptr
, cond_signal
);
139 if (first
->sby
> t_ptr
->sby
) continue;
140 if (first
->sby
< t_ptr
->sby
) {
141 insert_tasks_between(f
, first
, last
, prev_t
, t_ptr
, cond_signal
);
145 if (first
->type
> t_ptr
->type
) continue;
146 if (first
->type
< t_ptr
->type
) {
147 insert_tasks_between(f
, first
, last
, prev_t
, t_ptr
, cond_signal
);
154 assert(first
->type
== DAV1D_TASK_TYPE_TILE_RECONSTRUCTION
||
155 first
->type
== DAV1D_TASK_TYPE_TILE_ENTROPY
);
156 assert(first
->type
== t_ptr
->type
);
157 assert(t_ptr
->sby
== first
->sby
);
158 const int p
= first
->type
== DAV1D_TASK_TYPE_TILE_ENTROPY
;
159 const int t_tile_idx
= (int) (first
- f
->task_thread
.tile_tasks
[p
]);
160 const int p_tile_idx
= (int) (t_ptr
- f
->task_thread
.tile_tasks
[p
]);
161 assert(t_tile_idx
!= p_tile_idx
);
162 if (t_tile_idx
> p_tile_idx
) continue;
163 insert_tasks_between(f
, first
, last
, prev_t
, t_ptr
, cond_signal
);
167 insert_tasks_between(f
, first
, last
, prev_t
, NULL
, cond_signal
);
170 static inline void insert_task(Dav1dFrameContext
*const f
,
171 Dav1dTask
*const t
, const int cond_signal
)
173 insert_tasks(f
, t
, t
, cond_signal
);
176 static inline void add_pending(Dav1dFrameContext
*const f
, Dav1dTask
*const t
) {
177 pthread_mutex_lock(&f
->task_thread
.pending_tasks
.lock
);
179 if (!f
->task_thread
.pending_tasks
.head
)
180 f
->task_thread
.pending_tasks
.head
= t
;
182 f
->task_thread
.pending_tasks
.tail
->next
= t
;
183 f
->task_thread
.pending_tasks
.tail
= t
;
184 atomic_store(&f
->task_thread
.pending_tasks
.merge
, 1);
185 pthread_mutex_unlock(&f
->task_thread
.pending_tasks
.lock
);
188 static inline int merge_pending_frame(Dav1dFrameContext
*const f
) {
189 int const merge
= atomic_load(&f
->task_thread
.pending_tasks
.merge
);
191 pthread_mutex_lock(&f
->task_thread
.pending_tasks
.lock
);
192 Dav1dTask
*t
= f
->task_thread
.pending_tasks
.head
;
193 f
->task_thread
.pending_tasks
.head
= NULL
;
194 f
->task_thread
.pending_tasks
.tail
= NULL
;
195 atomic_store(&f
->task_thread
.pending_tasks
.merge
, 0);
196 pthread_mutex_unlock(&f
->task_thread
.pending_tasks
.lock
);
198 Dav1dTask
*const tmp
= t
->next
;
199 insert_task(f
, t
, 0);
206 static inline int merge_pending(const Dav1dContext
*const c
) {
208 for (unsigned i
= 0; i
< c
->n_fc
; i
++)
209 res
|= merge_pending_frame(&c
->fc
[i
]);
213 static int create_filter_sbrow(Dav1dFrameContext
*const f
,
214 const int pass
, Dav1dTask
**res_t
)
216 const int has_deblock
= f
->frame_hdr
->loopfilter
.level_y
[0] ||
217 f
->frame_hdr
->loopfilter
.level_y
[1];
218 const int has_cdef
= f
->seq_hdr
->cdef
;
219 const int has_resize
= f
->frame_hdr
->width
[0] != f
->frame_hdr
->width
[1];
220 const int has_lr
= f
->lf
.restore_planes
;
222 Dav1dTask
*tasks
= f
->task_thread
.tasks
;
223 const int uses_2pass
= f
->c
->n_fc
> 1;
224 int num_tasks
= f
->sbh
* (1 + uses_2pass
);
225 if (num_tasks
> f
->task_thread
.num_tasks
) {
226 const size_t size
= sizeof(Dav1dTask
) * num_tasks
;
227 tasks
= dav1d_realloc(ALLOC_COMMON_CTX
, f
->task_thread
.tasks
, size
);
228 if (!tasks
) return -1;
229 memset(tasks
, 0, size
);
230 f
->task_thread
.tasks
= tasks
;
231 f
->task_thread
.num_tasks
= num_tasks
;
233 tasks
+= f
->sbh
* (pass
& 1);
236 f
->frame_thread
.entropy_progress
= 0;
238 const int prog_sz
= ((f
->sbh
+ 31) & ~31) >> 5;
239 if (prog_sz
> f
->frame_thread
.prog_sz
) {
240 atomic_uint
*const prog
= dav1d_realloc(ALLOC_COMMON_CTX
, f
->frame_thread
.frame_progress
,
241 2 * prog_sz
* sizeof(*prog
));
242 if (!prog
) return -1;
243 f
->frame_thread
.frame_progress
= prog
;
244 f
->frame_thread
.copy_lpf_progress
= prog
+ prog_sz
;
246 f
->frame_thread
.prog_sz
= prog_sz
;
247 memset(f
->frame_thread
.frame_progress
, 0, prog_sz
* sizeof(atomic_uint
));
248 memset(f
->frame_thread
.copy_lpf_progress
, 0, prog_sz
* sizeof(atomic_uint
));
249 atomic_store(&f
->frame_thread
.deblock_progress
, 0);
251 f
->frame_thread
.next_tile_row
[pass
& 1] = 0;
253 Dav1dTask
*t
= &tasks
[0];
255 t
->recon_progress
= 1;
256 t
->deblock_progress
= 0;
257 t
->type
= pass
== 1 ? DAV1D_TASK_TYPE_ENTROPY_PROGRESS
:
258 has_deblock
? DAV1D_TASK_TYPE_DEBLOCK_COLS
:
259 has_cdef
|| has_lr
/* i.e. LR backup */ ? DAV1D_TASK_TYPE_DEBLOCK_ROWS
:
260 has_resize
? DAV1D_TASK_TYPE_SUPER_RESOLUTION
:
261 DAV1D_TASK_TYPE_RECONSTRUCTION_PROGRESS
;
262 t
->frame_idx
= (int)(f
- f
->c
->fc
);
268 int dav1d_task_create_tile_sbrow(Dav1dFrameContext
*const f
, const int pass
,
269 const int cond_signal
)
271 Dav1dTask
*tasks
= f
->task_thread
.tile_tasks
[0];
272 const int uses_2pass
= f
->c
->n_fc
> 1;
273 const int num_tasks
= f
->frame_hdr
->tiling
.cols
* f
->frame_hdr
->tiling
.rows
;
275 int alloc_num_tasks
= num_tasks
* (1 + uses_2pass
);
276 if (alloc_num_tasks
> f
->task_thread
.num_tile_tasks
) {
277 const size_t size
= sizeof(Dav1dTask
) * alloc_num_tasks
;
278 tasks
= dav1d_realloc(ALLOC_COMMON_CTX
, f
->task_thread
.tile_tasks
[0], size
);
279 if (!tasks
) return -1;
280 memset(tasks
, 0, size
);
281 f
->task_thread
.tile_tasks
[0] = tasks
;
282 f
->task_thread
.num_tile_tasks
= alloc_num_tasks
;
284 f
->task_thread
.tile_tasks
[1] = tasks
+ num_tasks
;
286 tasks
+= num_tasks
* (pass
& 1);
289 if (create_filter_sbrow(f
, pass
, &pf_t
))
292 Dav1dTask
*prev_t
= NULL
;
293 for (int tile_idx
= 0; tile_idx
< num_tasks
; tile_idx
++) {
294 Dav1dTileState
*const ts
= &f
->ts
[tile_idx
];
295 Dav1dTask
*t
= &tasks
[tile_idx
];
296 t
->sby
= ts
->tiling
.row_start
>> f
->sb_shift
;
297 if (pf_t
&& t
->sby
) {
302 t
->recon_progress
= 0;
303 t
->deblock_progress
= 0;
305 t
->type
= pass
!= 1 ? DAV1D_TASK_TYPE_TILE_RECONSTRUCTION
:
306 DAV1D_TASK_TYPE_TILE_ENTROPY
;
307 t
->frame_idx
= (int)(f
- f
->c
->fc
);
308 if (prev_t
) prev_t
->next
= t
;
317 atomic_store(&f
->task_thread
.done
[pass
& 1], 0);
319 // XXX in theory this could be done locklessly, at this point they are no
320 // tasks in the frameQ, so no other runner should be using this lock, but
321 // we must add both passes at once
322 pthread_mutex_lock(&f
->task_thread
.pending_tasks
.lock
);
323 assert(f
->task_thread
.pending_tasks
.head
== NULL
|| pass
== 2);
324 if (!f
->task_thread
.pending_tasks
.head
)
325 f
->task_thread
.pending_tasks
.head
= &tasks
[0];
327 f
->task_thread
.pending_tasks
.tail
->next
= &tasks
[0];
328 f
->task_thread
.pending_tasks
.tail
= prev_t
;
329 atomic_store(&f
->task_thread
.pending_tasks
.merge
, 1);
330 atomic_store(&f
->task_thread
.init_done
, 1);
331 pthread_mutex_unlock(&f
->task_thread
.pending_tasks
.lock
);
336 void dav1d_task_frame_init(Dav1dFrameContext
*const f
) {
337 const Dav1dContext
*const c
= f
->c
;
339 atomic_store(&f
->task_thread
.init_done
, 0);
340 // schedule init task, which will schedule the remaining tasks
341 Dav1dTask
*const t
= &f
->task_thread
.init_task
;
342 t
->type
= DAV1D_TASK_TYPE_INIT
;
343 t
->frame_idx
= (int)(f
- c
->fc
);
345 t
->recon_progress
= t
->deblock_progress
= 0;
346 insert_task(f
, t
, 1);
349 void dav1d_task_delayed_fg(Dav1dContext
*const c
, Dav1dPicture
*const out
,
350 const Dav1dPicture
*const in
)
352 struct TaskThreadData
*const ttd
= &c
->task_thread
;
353 ttd
->delayed_fg
.in
= in
;
354 ttd
->delayed_fg
.out
= out
;
355 ttd
->delayed_fg
.type
= DAV1D_TASK_TYPE_FG_PREP
;
356 atomic_init(&ttd
->delayed_fg
.progress
[0], 0);
357 atomic_init(&ttd
->delayed_fg
.progress
[1], 0);
358 pthread_mutex_lock(&ttd
->lock
);
359 ttd
->delayed_fg
.exec
= 1;
360 ttd
->delayed_fg
.finished
= 0;
361 pthread_cond_signal(&ttd
->cond
);
363 pthread_cond_wait(&ttd
->delayed_fg
.cond
, &ttd
->lock
);
364 } while (!ttd
->delayed_fg
.finished
);
365 pthread_mutex_unlock(&ttd
->lock
);
368 static inline int ensure_progress(struct TaskThreadData
*const ttd
,
369 Dav1dFrameContext
*const f
,
370 Dav1dTask
*const t
, const enum TaskType type
,
371 atomic_int
*const state
, int *const target
)
373 // deblock_rows (non-LR portion) depends on deblock of previous sbrow,
374 // so ensure that completed. if not, re-add to task-queue; else, fall-through
375 int p1
= atomic_load(state
);
378 t
->recon_progress
= t
->deblock_progress
= 0;
381 pthread_mutex_lock(&ttd
->lock
);
387 static inline int check_tile(Dav1dTask
*const t
, Dav1dFrameContext
*const f
,
390 const int tp
= t
->type
== DAV1D_TASK_TYPE_TILE_ENTROPY
;
391 const int tile_idx
= (int)(t
- f
->task_thread
.tile_tasks
[tp
]);
392 Dav1dTileState
*const ts
= &f
->ts
[tile_idx
];
393 const int p1
= atomic_load(&ts
->progress
[tp
]);
394 if (p1
< t
->sby
) return 1;
395 int error
= p1
== TILE_ERROR
;
396 error
|= atomic_fetch_or(&f
->task_thread
.error
, error
);
397 if (!error
&& frame_mt
&& !tp
) {
398 const int p2
= atomic_load(&ts
->progress
[1]);
399 if (p2
<= t
->sby
) return 1;
400 error
= p2
== TILE_ERROR
;
401 error
|= atomic_fetch_or(&f
->task_thread
.error
, error
);
403 if (!error
&& frame_mt
&& !IS_KEY_OR_INTRA(f
->frame_hdr
)) {
404 // check reference state
405 const Dav1dThreadPicture
*p
= &f
->sr_cur
;
406 const int ss_ver
= p
->p
.p
.layout
== DAV1D_PIXEL_LAYOUT_I420
;
407 const unsigned p_b
= (t
->sby
+ 1) << (f
->sb_shift
+ 2);
408 const int tile_sby
= t
->sby
- (ts
->tiling
.row_start
>> f
->sb_shift
);
409 const int (*const lowest_px
)[2] = ts
->lowest_pixel
[tile_sby
];
410 for (int n
= t
->deps_skip
; n
< 7; n
++, t
->deps_skip
++) {
413 // if temporal mv refs are disabled, we only need this
414 // for the primary ref; if segmentation is disabled, we
415 // don't even need that
418 // +8 is postfilter-induced delay
419 const int y
= lowest_px
[n
][0] == INT_MIN
? INT_MIN
:
421 const int uv
= lowest_px
[n
][1] == INT_MIN
? INT_MIN
:
422 lowest_px
[n
][1] * (1 << ss_ver
) + 8;
423 const int max
= imax(y
, uv
);
424 if (max
== INT_MIN
) continue;
425 lowest
= iclip(max
, 1, f
->refp
[n
].p
.p
.h
);
427 const unsigned p3
= atomic_load(&f
->refp
[n
].progress
[!tp
]);
428 if (p3
< lowest
) return 1;
429 atomic_fetch_or(&f
->task_thread
.error
, p3
== FRAME_ERROR
);
435 static inline int get_frame_progress(const Dav1dContext
*const c
,
436 const Dav1dFrameContext
*const f
)
438 unsigned frame_prog
= c
->n_fc
> 1 ? atomic_load(&f
->sr_cur
.progress
[1]) : 0;
439 if (frame_prog
>= FRAME_ERROR
)
441 int idx
= frame_prog
>> (f
->sb_shift
+ 7);
444 atomic_uint
*state
= &f
->frame_thread
.frame_progress
[idx
];
445 const unsigned val
= ~atomic_load(state
);
446 prog
= val
? ctz(val
) : 32;
447 if (prog
!= 32) break;
449 } while (++idx
< f
->frame_thread
.prog_sz
);
450 return ((idx
<< 5) | prog
) - 1;
453 static inline void abort_frame(Dav1dFrameContext
*const f
, const int error
) {
454 atomic_store(&f
->task_thread
.error
, error
== DAV1D_ERR(EINVAL
) ? 1 : -1);
455 atomic_store(&f
->task_thread
.task_counter
, 0);
456 atomic_store(&f
->task_thread
.done
[0], 1);
457 atomic_store(&f
->task_thread
.done
[1], 1);
458 atomic_store(&f
->sr_cur
.progress
[0], FRAME_ERROR
);
459 atomic_store(&f
->sr_cur
.progress
[1], FRAME_ERROR
);
460 dav1d_decode_frame_exit(f
, error
);
462 pthread_cond_signal(&f
->task_thread
.cond
);
465 static inline void delayed_fg_task(const Dav1dContext
*const c
,
466 struct TaskThreadData
*const ttd
)
468 const Dav1dPicture
*const in
= ttd
->delayed_fg
.in
;
469 Dav1dPicture
*const out
= ttd
->delayed_fg
.out
;
473 off
= (out
->p
.bpc
>> 1) - 4;
475 switch (ttd
->delayed_fg
.type
) {
476 case DAV1D_TASK_TYPE_FG_PREP
:
477 ttd
->delayed_fg
.exec
= 0;
478 if (atomic_load(&ttd
->cond_signaled
))
479 pthread_cond_signal(&ttd
->cond
);
480 pthread_mutex_unlock(&ttd
->lock
);
481 switch (out
->p
.bpc
) {
484 dav1d_prep_grain_8bpc(&c
->dsp
[0].fg
, out
, in
,
485 ttd
->delayed_fg
.scaling_8bpc
,
486 ttd
->delayed_fg
.grain_lut_8bpc
);
492 dav1d_prep_grain_16bpc(&c
->dsp
[off
].fg
, out
, in
,
493 ttd
->delayed_fg
.scaling_16bpc
,
494 ttd
->delayed_fg
.grain_lut_16bpc
);
499 ttd
->delayed_fg
.type
= DAV1D_TASK_TYPE_FG_APPLY
;
500 pthread_mutex_lock(&ttd
->lock
);
501 ttd
->delayed_fg
.exec
= 1;
503 case DAV1D_TASK_TYPE_FG_APPLY
:;
504 int row
= atomic_fetch_add(&ttd
->delayed_fg
.progress
[0], 1);
505 pthread_mutex_unlock(&ttd
->lock
);
506 int progmax
= (out
->p
.h
+ FG_BLOCK_SIZE
- 1) / FG_BLOCK_SIZE
;
507 while (row
< progmax
) {
508 if (row
+ 1 < progmax
)
509 pthread_cond_signal(&ttd
->cond
);
511 pthread_mutex_lock(&ttd
->lock
);
512 ttd
->delayed_fg
.exec
= 0;
513 pthread_mutex_unlock(&ttd
->lock
);
515 switch (out
->p
.bpc
) {
518 dav1d_apply_grain_row_8bpc(&c
->dsp
[0].fg
, out
, in
,
519 ttd
->delayed_fg
.scaling_8bpc
,
520 ttd
->delayed_fg
.grain_lut_8bpc
, row
);
526 dav1d_apply_grain_row_16bpc(&c
->dsp
[off
].fg
, out
, in
,
527 ttd
->delayed_fg
.scaling_16bpc
,
528 ttd
->delayed_fg
.grain_lut_16bpc
, row
);
533 row
= atomic_fetch_add(&ttd
->delayed_fg
.progress
[0], 1);
534 atomic_fetch_add(&ttd
->delayed_fg
.progress
[1], 1);
536 pthread_mutex_lock(&ttd
->lock
);
537 ttd
->delayed_fg
.exec
= 0;
538 int done
= atomic_fetch_add(&ttd
->delayed_fg
.progress
[1], 1) + 1;
539 progmax
= atomic_load(&ttd
->delayed_fg
.progress
[0]);
540 // signal for completion only once the last runner reaches this
541 if (done
>= progmax
) {
542 ttd
->delayed_fg
.finished
= 1;
543 pthread_cond_signal(&ttd
->delayed_fg
.cond
);
550 void *dav1d_worker_task(void *data
) {
551 Dav1dTaskContext
*const tc
= data
;
552 const Dav1dContext
*const c
= tc
->c
;
553 struct TaskThreadData
*const ttd
= tc
->task_thread
.ttd
;
555 dav1d_set_thread_name("dav1d-worker");
557 pthread_mutex_lock(&ttd
->lock
);
559 if (tc
->task_thread
.die
) break;
560 if (atomic_load(c
->flush
)) goto park
;
563 if (ttd
->delayed_fg
.exec
) { // run delayed film grain first
564 delayed_fg_task(c
, ttd
);
567 Dav1dFrameContext
*f
;
568 Dav1dTask
*t
, *prev_t
= NULL
;
569 if (c
->n_fc
> 1) { // run init tasks second
570 for (unsigned i
= 0; i
< c
->n_fc
; i
++) {
571 const unsigned first
= atomic_load(&ttd
->first
);
572 f
= &c
->fc
[(first
+ i
) % c
->n_fc
];
573 if (atomic_load(&f
->task_thread
.init_done
)) continue;
574 t
= f
->task_thread
.task_head
;
576 if (t
->type
== DAV1D_TASK_TYPE_INIT
) goto found
;
577 if (t
->type
== DAV1D_TASK_TYPE_INIT_CDF
) {
578 // XXX This can be a simple else, if adding tasks of both
579 // passes at once (in dav1d_task_create_tile_sbrow).
580 // Adding the tasks to the pending Q can result in a
581 // thread merging them before setting init_done.
582 // We will need to set init_done before adding to the
583 // pending Q, so maybe return the tasks, set init_done,
584 // and add to pending Q only then.
585 const int p1
= f
->in_cdf
.progress
?
586 atomic_load(f
->in_cdf
.progress
) : 1;
588 atomic_fetch_or(&f
->task_thread
.error
, p1
== TILE_ERROR
);
594 while (ttd
->cur
< c
->n_fc
) { // run decoding tasks last
595 const unsigned first
= atomic_load(&ttd
->first
);
596 f
= &c
->fc
[(first
+ ttd
->cur
) % c
->n_fc
];
597 merge_pending_frame(f
);
598 prev_t
= f
->task_thread
.task_cur_prev
;
599 t
= prev_t
? prev_t
->next
: f
->task_thread
.task_head
;
601 if (t
->type
== DAV1D_TASK_TYPE_INIT_CDF
) goto next
;
602 else if (t
->type
== DAV1D_TASK_TYPE_TILE_ENTROPY
||
603 t
->type
== DAV1D_TASK_TYPE_TILE_RECONSTRUCTION
)
605 // if not bottom sbrow of tile, this task will be re-added
606 // after it's finished
607 if (!check_tile(t
, f
, c
->n_fc
> 1))
609 } else if (t
->recon_progress
) {
610 const int p
= t
->type
== DAV1D_TASK_TYPE_ENTROPY_PROGRESS
;
611 int error
= atomic_load(&f
->task_thread
.error
);
612 assert(!atomic_load(&f
->task_thread
.done
[p
]) || error
);
613 const int tile_row_base
= f
->frame_hdr
->tiling
.cols
*
614 f
->frame_thread
.next_tile_row
[p
];
616 atomic_int
*const prog
= &f
->frame_thread
.entropy_progress
;
617 const int p1
= atomic_load(prog
);
618 if (p1
< t
->sby
) goto next
;
619 atomic_fetch_or(&f
->task_thread
.error
, p1
== TILE_ERROR
);
621 for (int tc
= 0; tc
< f
->frame_hdr
->tiling
.cols
; tc
++) {
622 Dav1dTileState
*const ts
= &f
->ts
[tile_row_base
+ tc
];
623 const int p2
= atomic_load(&ts
->progress
[p
]);
624 if (p2
< t
->recon_progress
) goto next
;
625 atomic_fetch_or(&f
->task_thread
.error
, p2
== TILE_ERROR
);
627 if (t
->sby
+ 1 < f
->sbh
) {
628 // add sby+1 to list to replace this one
629 Dav1dTask
*next_t
= &t
[1];
632 const int ntr
= f
->frame_thread
.next_tile_row
[p
] + 1;
633 const int start
= f
->frame_hdr
->tiling
.row_start_sb
[ntr
];
634 if (next_t
->sby
== start
)
635 f
->frame_thread
.next_tile_row
[p
] = ntr
;
636 next_t
->recon_progress
= next_t
->sby
+ 1;
637 insert_task(f
, next_t
, 0);
640 } else if (t
->type
== DAV1D_TASK_TYPE_CDEF
) {
641 atomic_uint
*prog
= f
->frame_thread
.copy_lpf_progress
;
642 const int p1
= atomic_load(&prog
[(t
->sby
- 1) >> 5]);
643 if (p1
& (1U << ((t
->sby
- 1) & 31)))
646 assert(t
->deblock_progress
);
647 const int p1
= atomic_load(&f
->frame_thread
.deblock_progress
);
648 if (p1
>= t
->deblock_progress
) {
649 atomic_fetch_or(&f
->task_thread
.error
, p1
== TILE_ERROR
);
656 f
->task_thread
.task_cur_prev
= prev_t
;
660 if (reset_task_cur(c
, ttd
, UINT_MAX
)) continue;
661 if (merge_pending(c
)) continue;
663 tc
->task_thread
.flushed
= 1;
664 pthread_cond_signal(&tc
->task_thread
.td
.cond
);
665 // we want to be woken up next time progress is signaled
666 atomic_store(&ttd
->cond_signaled
, 0);
667 pthread_cond_wait(&ttd
->cond
, &ttd
->lock
);
668 tc
->task_thread
.flushed
= 0;
669 reset_task_cur(c
, ttd
, UINT_MAX
);
673 // remove t from list
674 if (prev_t
) prev_t
->next
= t
->next
;
675 else f
->task_thread
.task_head
= t
->next
;
676 if (!t
->next
) f
->task_thread
.task_tail
= prev_t
;
677 if (t
->type
> DAV1D_TASK_TYPE_INIT_CDF
&& !f
->task_thread
.task_head
)
680 // we don't need to check cond_signaled here, since we found a task
681 // after the last signal so we want to re-signal the next waiting thread
682 // and again won't need to signal after that
683 atomic_store(&ttd
->cond_signaled
, 1);
684 pthread_cond_signal(&ttd
->cond
);
685 pthread_mutex_unlock(&ttd
->lock
);
687 const int flush
= atomic_load(c
->flush
);
688 int error
= atomic_fetch_or(&f
->task_thread
.error
, flush
) | flush
;
694 case DAV1D_TASK_TYPE_INIT
: {
696 int res
= dav1d_decode_frame_init(f
);
697 int p1
= f
->in_cdf
.progress
? atomic_load(f
->in_cdf
.progress
) : 1;
698 if (res
|| p1
== TILE_ERROR
) {
699 pthread_mutex_lock(&ttd
->lock
);
700 abort_frame(f
, res
? res
: DAV1D_ERR(EINVAL
));
701 reset_task_cur(c
, ttd
, t
->frame_idx
);
703 t
->type
= DAV1D_TASK_TYPE_INIT_CDF
;
704 if (p1
) goto found_unlocked
;
706 pthread_mutex_lock(&ttd
->lock
);
710 case DAV1D_TASK_TYPE_INIT_CDF
: {
712 int res
= DAV1D_ERR(EINVAL
);
713 if (!atomic_load(&f
->task_thread
.error
))
714 res
= dav1d_decode_frame_init_cdf(f
);
715 if (f
->frame_hdr
->refresh_context
&& !f
->task_thread
.update_set
) {
716 atomic_store(f
->out_cdf
.progress
, res
< 0 ? TILE_ERROR
: 1);
720 for (int p
= 1; p
<= 2; p
++) {
721 const int res
= dav1d_task_create_tile_sbrow(f
, p
, 0);
723 pthread_mutex_lock(&ttd
->lock
);
724 // memory allocation failed
725 atomic_store(&f
->task_thread
.done
[2 - p
], 1);
726 atomic_store(&f
->task_thread
.error
, -1);
727 atomic_fetch_sub(&f
->task_thread
.task_counter
,
728 f
->frame_hdr
->tiling
.cols
*
729 f
->frame_hdr
->tiling
.rows
+ f
->sbh
);
730 atomic_store(&f
->sr_cur
.progress
[p
- 1], FRAME_ERROR
);
731 if (p
== 2 && atomic_load(&f
->task_thread
.done
[1])) {
732 assert(!atomic_load(&f
->task_thread
.task_counter
));
733 dav1d_decode_frame_exit(f
, DAV1D_ERR(ENOMEM
));
735 pthread_cond_signal(&f
->task_thread
.cond
);
737 pthread_mutex_unlock(&ttd
->lock
);
741 pthread_mutex_lock(&ttd
->lock
);
743 pthread_mutex_lock(&ttd
->lock
);
745 reset_task_cur(c
, ttd
, t
->frame_idx
);
746 atomic_store(&f
->task_thread
.init_done
, 1);
750 case DAV1D_TASK_TYPE_TILE_ENTROPY
:
751 case DAV1D_TASK_TYPE_TILE_RECONSTRUCTION
: {
752 const int p
= t
->type
== DAV1D_TASK_TYPE_TILE_ENTROPY
;
753 const int tile_idx
= (int)(t
- f
->task_thread
.tile_tasks
[p
]);
754 Dav1dTileState
*const ts
= &f
->ts
[tile_idx
];
757 tc
->by
= sby
<< f
->sb_shift
;
758 const int uses_2pass
= c
->n_fc
> 1;
759 tc
->frame_thread
.pass
= !uses_2pass
? 0 :
760 1 + (t
->type
== DAV1D_TASK_TYPE_TILE_RECONSTRUCTION
);
761 if (!error
) error
= dav1d_decode_tile_sbrow(tc
);
762 const int progress
= error
? TILE_ERROR
: 1 + sby
;
765 atomic_fetch_or(&f
->task_thread
.error
, error
);
766 if (((sby
+ 1) << f
->sb_shift
) < ts
->tiling
.row_end
) {
769 if (!check_tile(t
, f
, uses_2pass
)) {
770 atomic_store(&ts
->progress
[p
], progress
);
771 reset_task_cur_async(ttd
, t
->frame_idx
, c
->n_fc
);
772 if (!atomic_fetch_or(&ttd
->cond_signaled
, 1))
773 pthread_cond_signal(&ttd
->cond
);
776 atomic_store(&ts
->progress
[p
], progress
);
778 pthread_mutex_lock(&ttd
->lock
);
780 pthread_mutex_lock(&ttd
->lock
);
781 atomic_store(&ts
->progress
[p
], progress
);
782 reset_task_cur(c
, ttd
, t
->frame_idx
);
783 error
= atomic_load(&f
->task_thread
.error
);
784 if (f
->frame_hdr
->refresh_context
&&
785 tc
->frame_thread
.pass
<= 1 && f
->task_thread
.update_set
&&
786 f
->frame_hdr
->tiling
.update
== tile_idx
)
789 dav1d_cdf_thread_update(f
->frame_hdr
, f
->out_cdf
.data
.cdf
,
790 &f
->ts
[f
->frame_hdr
->tiling
.update
].cdf
);
792 atomic_store(f
->out_cdf
.progress
, error
? TILE_ERROR
: 1);
794 if (atomic_fetch_sub(&f
->task_thread
.task_counter
, 1) - 1 == 0 &&
795 atomic_load(&f
->task_thread
.done
[0]) &&
796 (!uses_2pass
|| atomic_load(&f
->task_thread
.done
[1])))
798 error
= atomic_load(&f
->task_thread
.error
);
799 dav1d_decode_frame_exit(f
, error
== 1 ? DAV1D_ERR(EINVAL
) :
800 error
? DAV1D_ERR(ENOMEM
) : 0);
802 pthread_cond_signal(&f
->task_thread
.cond
);
804 assert(atomic_load(&f
->task_thread
.task_counter
) >= 0);
805 if (!atomic_fetch_or(&ttd
->cond_signaled
, 1))
806 pthread_cond_signal(&ttd
->cond
);
810 case DAV1D_TASK_TYPE_DEBLOCK_COLS
:
811 if (!atomic_load(&f
->task_thread
.error
))
812 f
->bd_fn
.filter_sbrow_deblock_cols(f
, sby
);
813 if (ensure_progress(ttd
, f
, t
, DAV1D_TASK_TYPE_DEBLOCK_ROWS
,
814 &f
->frame_thread
.deblock_progress
,
815 &t
->deblock_progress
)) continue;
817 case DAV1D_TASK_TYPE_DEBLOCK_ROWS
:
818 if (!atomic_load(&f
->task_thread
.error
))
819 f
->bd_fn
.filter_sbrow_deblock_rows(f
, sby
);
820 // signal deblock progress
821 if (f
->frame_hdr
->loopfilter
.level_y
[0] ||
822 f
->frame_hdr
->loopfilter
.level_y
[1])
824 error
= atomic_load(&f
->task_thread
.error
);
825 atomic_store(&f
->frame_thread
.deblock_progress
,
826 error
? TILE_ERROR
: sby
+ 1);
827 reset_task_cur_async(ttd
, t
->frame_idx
, c
->n_fc
);
828 if (!atomic_fetch_or(&ttd
->cond_signaled
, 1))
829 pthread_cond_signal(&ttd
->cond
);
830 } else if (f
->seq_hdr
->cdef
|| f
->lf
.restore_planes
) {
831 atomic_fetch_or(&f
->frame_thread
.copy_lpf_progress
[sby
>> 5],
833 // CDEF needs the top buffer to be saved by lr_copy_lpf of the
836 int prog
= atomic_load(&f
->frame_thread
.copy_lpf_progress
[(sby
- 1) >> 5]);
837 if (~prog
& (1U << ((sby
- 1) & 31))) {
838 t
->type
= DAV1D_TASK_TYPE_CDEF
;
839 t
->recon_progress
= t
->deblock_progress
= 0;
841 pthread_mutex_lock(&ttd
->lock
);
847 case DAV1D_TASK_TYPE_CDEF
:
848 if (f
->seq_hdr
->cdef
) {
849 if (!atomic_load(&f
->task_thread
.error
))
850 f
->bd_fn
.filter_sbrow_cdef(tc
, sby
);
851 reset_task_cur_async(ttd
, t
->frame_idx
, c
->n_fc
);
852 if (!atomic_fetch_or(&ttd
->cond_signaled
, 1))
853 pthread_cond_signal(&ttd
->cond
);
856 case DAV1D_TASK_TYPE_SUPER_RESOLUTION
:
857 if (f
->frame_hdr
->width
[0] != f
->frame_hdr
->width
[1])
858 if (!atomic_load(&f
->task_thread
.error
))
859 f
->bd_fn
.filter_sbrow_resize(f
, sby
);
861 case DAV1D_TASK_TYPE_LOOP_RESTORATION
:
862 if (!atomic_load(&f
->task_thread
.error
) && f
->lf
.restore_planes
)
863 f
->bd_fn
.filter_sbrow_lr(f
, sby
);
865 case DAV1D_TASK_TYPE_RECONSTRUCTION_PROGRESS
:
866 // dummy to cover for no post-filters
867 case DAV1D_TASK_TYPE_ENTROPY_PROGRESS
:
868 // dummy to convert tile progress to frame
872 // if task completed [typically LR], signal picture progress as per below
873 const int uses_2pass
= c
->n_fc
> 1;
874 const int sbh
= f
->sbh
;
875 const int sbsz
= f
->sb_step
* 4;
876 if (t
->type
== DAV1D_TASK_TYPE_ENTROPY_PROGRESS
) {
877 error
= atomic_load(&f
->task_thread
.error
);
878 const unsigned y
= sby
+ 1 == sbh
? UINT_MAX
: (unsigned)(sby
+ 1) * sbsz
;
880 if (f
->sr_cur
.p
.data
[0] /* upon flush, this can be free'ed already */)
881 atomic_store(&f
->sr_cur
.progress
[0], error
? FRAME_ERROR
: y
);
882 atomic_store(&f
->frame_thread
.entropy_progress
,
883 error
? TILE_ERROR
: sby
+ 1);
885 atomic_store(&f
->task_thread
.done
[1], 1);
886 pthread_mutex_lock(&ttd
->lock
);
887 const int num_tasks
= atomic_fetch_sub(&f
->task_thread
.task_counter
, 1) - 1;
888 if (sby
+ 1 < sbh
&& num_tasks
) {
889 reset_task_cur(c
, ttd
, t
->frame_idx
);
892 if (!num_tasks
&& atomic_load(&f
->task_thread
.done
[0]) &&
893 atomic_load(&f
->task_thread
.done
[1]))
895 error
= atomic_load(&f
->task_thread
.error
);
896 dav1d_decode_frame_exit(f
, error
== 1 ? DAV1D_ERR(EINVAL
) :
897 error
? DAV1D_ERR(ENOMEM
) : 0);
899 pthread_cond_signal(&f
->task_thread
.cond
);
901 reset_task_cur(c
, ttd
, t
->frame_idx
);
904 // t->type != DAV1D_TASK_TYPE_ENTROPY_PROGRESS
905 atomic_fetch_or(&f
->frame_thread
.frame_progress
[sby
>> 5],
907 pthread_mutex_lock(&f
->task_thread
.lock
);
908 sby
= get_frame_progress(c
, f
);
909 error
= atomic_load(&f
->task_thread
.error
);
910 const unsigned y
= sby
+ 1 == sbh
? UINT_MAX
: (unsigned)(sby
+ 1) * sbsz
;
911 if (c
->n_fc
> 1 && f
->sr_cur
.p
.data
[0] /* upon flush, this can be free'ed already */)
912 atomic_store(&f
->sr_cur
.progress
[1], error
? FRAME_ERROR
: y
);
913 pthread_mutex_unlock(&f
->task_thread
.lock
);
915 atomic_store(&f
->task_thread
.done
[0], 1);
916 pthread_mutex_lock(&ttd
->lock
);
917 const int num_tasks
= atomic_fetch_sub(&f
->task_thread
.task_counter
, 1) - 1;
918 if (sby
+ 1 < sbh
&& num_tasks
) {
919 reset_task_cur(c
, ttd
, t
->frame_idx
);
922 if (!num_tasks
&& atomic_load(&f
->task_thread
.done
[0]) &&
923 (!uses_2pass
|| atomic_load(&f
->task_thread
.done
[1])))
925 error
= atomic_load(&f
->task_thread
.error
);
926 dav1d_decode_frame_exit(f
, error
== 1 ? DAV1D_ERR(EINVAL
) :
927 error
? DAV1D_ERR(ENOMEM
) : 0);
929 pthread_cond_signal(&f
->task_thread
.cond
);
931 reset_task_cur(c
, ttd
, t
->frame_idx
);
933 pthread_mutex_unlock(&ttd
->lock
);