2 * Background jobs (long-running operations)
4 * Copyright (c) 2011 IBM Corp.
5 * Copyright (c) 2012, 2018 Red Hat, Inc.
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26 #include "qemu/osdep.h"
27 #include "qapi/error.h"
30 #include "qemu/main-loop.h"
31 #include "block/aio-wait.h"
32 #include "trace/trace-root.h"
33 #include "qapi/qapi-events-job.h"
36 * The job API is composed of two categories of functions.
38 * The first includes functions used by the monitor. The monitor is
39 * peculiar in that it accesses the job list with job_get, and
40 * therefore needs consistency across job_get and the actual operation
41 * (e.g. job_user_cancel). To achieve this consistency, the caller
42 * calls job_lock/job_unlock itself around the whole operation.
45 * The second includes functions used by the job drivers and sometimes
46 * by the core block layer. These delegate the locking to the callee instead.
48 * TODO Actually make this true
52 * job_mutex protects the jobs list, but also makes the
53 * struct job fields thread-safe.
57 /* Protected by job_mutex */
58 static QLIST_HEAD(, Job
) jobs
= QLIST_HEAD_INITIALIZER(jobs
);
60 /* Job State Transition Table */
61 bool JobSTT
[JOB_STATUS__MAX
][JOB_STATUS__MAX
] = {
62 /* U, C, R, P, Y, S, W, D, X, E, N */
63 /* U: */ [JOB_STATUS_UNDEFINED
] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
64 /* C: */ [JOB_STATUS_CREATED
] = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
65 /* R: */ [JOB_STATUS_RUNNING
] = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
66 /* P: */ [JOB_STATUS_PAUSED
] = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
67 /* Y: */ [JOB_STATUS_READY
] = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
68 /* S: */ [JOB_STATUS_STANDBY
] = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
69 /* W: */ [JOB_STATUS_WAITING
] = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
70 /* D: */ [JOB_STATUS_PENDING
] = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
71 /* X: */ [JOB_STATUS_ABORTING
] = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
72 /* E: */ [JOB_STATUS_CONCLUDED
] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
73 /* N: */ [JOB_STATUS_NULL
] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
76 bool JobVerbTable
[JOB_VERB__MAX
][JOB_STATUS__MAX
] = {
77 /* U, C, R, P, Y, S, W, D, X, E, N */
78 [JOB_VERB_CANCEL
] = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
79 [JOB_VERB_PAUSE
] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
80 [JOB_VERB_RESUME
] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
81 [JOB_VERB_SET_SPEED
] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
82 [JOB_VERB_COMPLETE
] = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
83 [JOB_VERB_FINALIZE
] = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
84 [JOB_VERB_DISMISS
] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
87 /* Transactional group of jobs */
90 /* Is this txn being cancelled? */
94 QLIST_HEAD(, Job
) jobs
;
105 void job_unlock(void)
110 static void real_job_lock(void)
112 qemu_mutex_lock(&job_mutex
);
115 static void real_job_unlock(void)
117 qemu_mutex_unlock(&job_mutex
);
120 static void __attribute__((__constructor__
)) job_init(void)
122 qemu_mutex_init(&job_mutex
);
125 JobTxn
*job_txn_new(void)
127 JobTxn
*txn
= g_new0(JobTxn
, 1);
128 QLIST_INIT(&txn
->jobs
);
133 /* Called with job_mutex held. */
134 static void job_txn_ref_locked(JobTxn
*txn
)
139 void job_txn_unref_locked(JobTxn
*txn
)
141 if (txn
&& --txn
->refcnt
== 0) {
146 void job_txn_unref(JobTxn
*txn
)
149 job_txn_unref_locked(txn
);
153 * @txn: The transaction (may be NULL)
154 * @job: Job to add to the transaction
156 * Add @job to the transaction. The @job must not already be in a transaction.
157 * The caller must call either job_txn_unref() or job_completed() to release
158 * the reference that is automatically grabbed here.
160 * If @txn is NULL, the function does nothing.
162 * Called with job_mutex held.
164 static void job_txn_add_job_locked(JobTxn
*txn
, Job
*job
)
173 QLIST_INSERT_HEAD(&txn
->jobs
, job
, txn_list
);
174 job_txn_ref_locked(txn
);
177 /* Called with job_mutex held. */
178 static void job_txn_del_job_locked(Job
*job
)
181 QLIST_REMOVE(job
, txn_list
);
182 job_txn_unref_locked(job
->txn
);
187 /* Called with job_mutex held, but releases it temporarily. */
188 static int job_txn_apply_locked(Job
*job
, int fn(Job
*))
190 AioContext
*inner_ctx
;
191 Job
*other_job
, *next
;
192 JobTxn
*txn
= job
->txn
;
196 * Similar to job_completed_txn_abort, we take each job's lock before
197 * applying fn, but since we assume that outer_ctx is held by the caller,
198 * we need to release it here to avoid holding the lock twice - which would
199 * break AIO_WAIT_WHILE from within fn.
202 aio_context_release(job
->aio_context
);
204 QLIST_FOREACH_SAFE(other_job
, &txn
->jobs
, txn_list
, next
) {
205 inner_ctx
= other_job
->aio_context
;
206 aio_context_acquire(inner_ctx
);
208 aio_context_release(inner_ctx
);
215 * Note that job->aio_context might have been changed by calling fn, so we
216 * can't use a local variable to cache it.
218 aio_context_acquire(job
->aio_context
);
219 job_unref_locked(job
);
223 bool job_is_internal(Job
*job
)
225 return (job
->id
== NULL
);
228 /* Called with job_mutex held. */
229 static void job_state_transition_locked(Job
*job
, JobStatus s1
)
231 JobStatus s0
= job
->status
;
232 assert(s1
>= 0 && s1
< JOB_STATUS__MAX
);
233 trace_job_state_transition(job
, job
->ret
,
234 JobSTT
[s0
][s1
] ? "allowed" : "disallowed",
235 JobStatus_str(s0
), JobStatus_str(s1
));
236 assert(JobSTT
[s0
][s1
]);
239 if (!job_is_internal(job
) && s1
!= s0
) {
240 qapi_event_send_job_status_change(job
->id
, job
->status
);
244 int job_apply_verb_locked(Job
*job
, JobVerb verb
, Error
**errp
)
246 JobStatus s0
= job
->status
;
247 assert(verb
>= 0 && verb
< JOB_VERB__MAX
);
248 trace_job_apply_verb(job
, JobStatus_str(s0
), JobVerb_str(verb
),
249 JobVerbTable
[verb
][s0
] ? "allowed" : "prohibited");
250 if (JobVerbTable
[verb
][s0
]) {
253 error_setg(errp
, "Job '%s' in state '%s' cannot accept command verb '%s'",
254 job
->id
, JobStatus_str(s0
), JobVerb_str(verb
));
258 int job_apply_verb(Job
*job
, JobVerb verb
, Error
**errp
)
261 return job_apply_verb_locked(job
, verb
, errp
);
264 JobType
job_type(const Job
*job
)
266 return job
->driver
->job_type
;
269 const char *job_type_str(const Job
*job
)
271 return JobType_str(job_type(job
));
274 bool job_is_cancelled_locked(Job
*job
)
276 /* force_cancel may be true only if cancelled is true, too */
277 assert(job
->cancelled
|| !job
->force_cancel
);
278 return job
->force_cancel
;
281 bool job_is_cancelled(Job
*job
)
284 return job_is_cancelled_locked(job
);
287 /* Called with job_mutex held. */
288 static bool job_cancel_requested_locked(Job
*job
)
290 return job
->cancelled
;
293 bool job_cancel_requested(Job
*job
)
296 return job_cancel_requested_locked(job
);
299 bool job_is_ready_locked(Job
*job
)
301 switch (job
->status
) {
302 case JOB_STATUS_UNDEFINED
:
303 case JOB_STATUS_CREATED
:
304 case JOB_STATUS_RUNNING
:
305 case JOB_STATUS_PAUSED
:
306 case JOB_STATUS_WAITING
:
307 case JOB_STATUS_PENDING
:
308 case JOB_STATUS_ABORTING
:
309 case JOB_STATUS_CONCLUDED
:
310 case JOB_STATUS_NULL
:
312 case JOB_STATUS_READY
:
313 case JOB_STATUS_STANDBY
:
316 g_assert_not_reached();
321 bool job_is_ready(Job
*job
)
324 return job_is_ready_locked(job
);
327 bool job_is_completed_locked(Job
*job
)
329 switch (job
->status
) {
330 case JOB_STATUS_UNDEFINED
:
331 case JOB_STATUS_CREATED
:
332 case JOB_STATUS_RUNNING
:
333 case JOB_STATUS_PAUSED
:
334 case JOB_STATUS_READY
:
335 case JOB_STATUS_STANDBY
:
337 case JOB_STATUS_WAITING
:
338 case JOB_STATUS_PENDING
:
339 case JOB_STATUS_ABORTING
:
340 case JOB_STATUS_CONCLUDED
:
341 case JOB_STATUS_NULL
:
344 g_assert_not_reached();
349 bool job_is_completed(Job
*job
)
352 return job_is_completed_locked(job
);
355 static bool job_started_locked(Job
*job
)
360 /* Called with job_mutex held. */
361 static bool job_should_pause_locked(Job
*job
)
363 return job
->pause_count
> 0;
366 Job
*job_next_locked(Job
*job
)
369 return QLIST_FIRST(&jobs
);
371 return QLIST_NEXT(job
, job_list
);
374 Job
*job_next(Job
*job
)
377 return job_next_locked(job
);
380 Job
*job_get_locked(const char *id
)
384 QLIST_FOREACH(job
, &jobs
, job_list
) {
385 if (job
->id
&& !strcmp(id
, job
->id
)) {
393 Job
*job_get(const char *id
)
396 return job_get_locked(id
);
399 void job_set_aio_context(Job
*job
, AioContext
*ctx
)
401 /* protect against read in job_finish_sync_locked and job_start */
403 /* protect against read in job_do_yield_locked */
405 /* ensure the job is quiescent while the AioContext is changed */
406 assert(job
->paused
|| job_is_completed_locked(job
));
407 job
->aio_context
= ctx
;
410 /* Called with job_mutex *not* held. */
411 static void job_sleep_timer_cb(void *opaque
)
418 void *job_create(const char *job_id
, const JobDriver
*driver
, JobTxn
*txn
,
419 AioContext
*ctx
, int flags
, BlockCompletionFunc
*cb
,
420 void *opaque
, Error
**errp
)
427 if (flags
& JOB_INTERNAL
) {
428 error_setg(errp
, "Cannot specify job ID for internal job");
431 if (!id_wellformed(job_id
)) {
432 error_setg(errp
, "Invalid job ID '%s'", job_id
);
435 if (job_get_locked(job_id
)) {
436 error_setg(errp
, "Job ID '%s' already in use", job_id
);
439 } else if (!(flags
& JOB_INTERNAL
)) {
440 error_setg(errp
, "An explicit job ID is required");
444 job
= g_malloc0(driver
->instance_size
);
445 job
->driver
= driver
;
446 job
->id
= g_strdup(job_id
);
448 job
->aio_context
= ctx
;
451 job
->pause_count
= 1;
452 job
->auto_finalize
= !(flags
& JOB_MANUAL_FINALIZE
);
453 job
->auto_dismiss
= !(flags
& JOB_MANUAL_DISMISS
);
455 job
->opaque
= opaque
;
457 progress_init(&job
->progress
);
459 notifier_list_init(&job
->on_finalize_cancelled
);
460 notifier_list_init(&job
->on_finalize_completed
);
461 notifier_list_init(&job
->on_pending
);
462 notifier_list_init(&job
->on_ready
);
463 notifier_list_init(&job
->on_idle
);
465 job_state_transition_locked(job
, JOB_STATUS_CREATED
);
466 aio_timer_init(qemu_get_aio_context(), &job
->sleep_timer
,
467 QEMU_CLOCK_REALTIME
, SCALE_NS
,
468 job_sleep_timer_cb
, job
);
470 QLIST_INSERT_HEAD(&jobs
, job
, job_list
);
472 /* Single jobs are modeled as single-job transactions for sake of
473 * consolidating the job management logic */
476 job_txn_add_job_locked(txn
, job
);
477 job_txn_unref_locked(txn
);
479 job_txn_add_job_locked(txn
, job
);
485 void job_ref_locked(Job
*job
)
490 void job_ref(Job
*job
)
496 void job_unref_locked(Job
*job
)
500 if (--job
->refcnt
== 0) {
501 assert(job
->status
== JOB_STATUS_NULL
);
502 assert(!timer_pending(&job
->sleep_timer
));
505 if (job
->driver
->free
) {
507 job
->driver
->free(job
);
511 QLIST_REMOVE(job
, job_list
);
513 progress_destroy(&job
->progress
);
514 error_free(job
->err
);
520 void job_unref(Job
*job
)
523 job_unref_locked(job
);
526 void job_progress_update(Job
*job
, uint64_t done
)
528 progress_work_done(&job
->progress
, done
);
531 void job_progress_set_remaining(Job
*job
, uint64_t remaining
)
533 progress_set_remaining(&job
->progress
, remaining
);
536 void job_progress_increase_remaining(Job
*job
, uint64_t delta
)
538 progress_increase_remaining(&job
->progress
, delta
);
542 * To be called when a cancelled job is finalised.
543 * Called with job_mutex held.
545 static void job_event_cancelled_locked(Job
*job
)
547 notifier_list_notify(&job
->on_finalize_cancelled
, job
);
551 * To be called when a successfully completed job is finalised.
552 * Called with job_mutex held.
554 static void job_event_completed_locked(Job
*job
)
556 notifier_list_notify(&job
->on_finalize_completed
, job
);
559 /* Called with job_mutex held. */
560 static void job_event_pending_locked(Job
*job
)
562 notifier_list_notify(&job
->on_pending
, job
);
565 /* Called with job_mutex held. */
566 static void job_event_ready_locked(Job
*job
)
568 notifier_list_notify(&job
->on_ready
, job
);
571 /* Called with job_mutex held. */
572 static void job_event_idle_locked(Job
*job
)
574 notifier_list_notify(&job
->on_idle
, job
);
577 void job_enter_cond_locked(Job
*job
, bool(*fn
)(Job
*job
))
579 if (!job_started_locked(job
)) {
582 if (job
->deferred_to_main_loop
) {
592 if (fn
&& !fn(job
)) {
597 assert(!job
->deferred_to_main_loop
);
598 timer_del(&job
->sleep_timer
);
602 aio_co_wake(job
->co
);
606 void job_enter_cond(Job
*job
, bool(*fn
)(Job
*job
))
609 job_enter_cond_locked(job
, fn
);
612 void job_enter(Job
*job
)
615 job_enter_cond_locked(job
, NULL
);
618 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
619 * Reentering the job coroutine with job_enter() before the timer has expired
620 * is allowed and cancels the timer.
622 * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
625 * Called with job_mutex held, but releases it temporarily.
627 static void coroutine_fn
job_do_yield_locked(Job
*job
, uint64_t ns
)
629 AioContext
*next_aio_context
;
633 timer_mod(&job
->sleep_timer
, ns
);
636 job_event_idle_locked(job
);
639 qemu_coroutine_yield();
642 next_aio_context
= job
->aio_context
;
644 * Coroutine has resumed, but in the meanwhile the job AioContext
645 * might have changed via bdrv_try_set_aio_context(), so we need to move
646 * the coroutine too in the new aiocontext.
648 while (qemu_get_current_aio_context() != next_aio_context
) {
650 aio_co_reschedule_self(next_aio_context
);
652 next_aio_context
= job
->aio_context
;
655 /* Set by job_enter_cond_locked() before re-entering the coroutine. */
659 /* Called with job_mutex held, but releases it temporarily. */
660 static void coroutine_fn
job_pause_point_locked(Job
*job
)
662 assert(job
&& job_started_locked(job
));
664 if (!job_should_pause_locked(job
)) {
667 if (job_is_cancelled_locked(job
)) {
671 if (job
->driver
->pause
) {
673 job
->driver
->pause(job
);
677 if (job_should_pause_locked(job
) && !job_is_cancelled_locked(job
)) {
678 JobStatus status
= job
->status
;
679 job_state_transition_locked(job
, status
== JOB_STATUS_READY
681 : JOB_STATUS_PAUSED
);
683 job_do_yield_locked(job
, -1);
685 job_state_transition_locked(job
, status
);
688 if (job
->driver
->resume
) {
690 job
->driver
->resume(job
);
695 void coroutine_fn
job_pause_point(Job
*job
)
698 job_pause_point_locked(job
);
701 static void coroutine_fn
job_yield_locked(Job
*job
)
705 /* Check cancellation *before* setting busy = false, too! */
706 if (job_is_cancelled_locked(job
)) {
710 if (!job_should_pause_locked(job
)) {
711 job_do_yield_locked(job
, -1);
714 job_pause_point_locked(job
);
717 void coroutine_fn
job_yield(Job
*job
)
720 job_yield_locked(job
);
723 void coroutine_fn
job_sleep_ns(Job
*job
, int64_t ns
)
728 /* Check cancellation *before* setting busy = false, too! */
729 if (job_is_cancelled_locked(job
)) {
733 if (!job_should_pause_locked(job
)) {
734 job_do_yield_locked(job
, qemu_clock_get_ns(QEMU_CLOCK_REALTIME
) + ns
);
737 job_pause_point_locked(job
);
740 /* Assumes the job_mutex is held */
741 static bool job_timer_not_pending_locked(Job
*job
)
743 return !timer_pending(&job
->sleep_timer
);
746 void job_pause_locked(Job
*job
)
750 job_enter_cond_locked(job
, NULL
);
754 void job_pause(Job
*job
)
757 job_pause_locked(job
);
760 void job_resume_locked(Job
*job
)
762 assert(job
->pause_count
> 0);
764 if (job
->pause_count
) {
768 /* kick only if no timer is pending */
769 job_enter_cond_locked(job
, job_timer_not_pending_locked
);
772 void job_resume(Job
*job
)
775 job_resume_locked(job
);
778 void job_user_pause_locked(Job
*job
, Error
**errp
)
780 if (job_apply_verb_locked(job
, JOB_VERB_PAUSE
, errp
)) {
783 if (job
->user_paused
) {
784 error_setg(errp
, "Job is already paused");
787 job
->user_paused
= true;
788 job_pause_locked(job
);
791 void job_user_pause(Job
*job
, Error
**errp
)
794 job_user_pause_locked(job
, errp
);
797 bool job_user_paused_locked(Job
*job
)
799 return job
->user_paused
;
802 bool job_user_paused(Job
*job
)
805 return job_user_paused_locked(job
);
808 void job_user_resume_locked(Job
*job
, Error
**errp
)
812 if (!job
->user_paused
|| job
->pause_count
<= 0) {
813 error_setg(errp
, "Can't resume a job that was not paused");
816 if (job_apply_verb_locked(job
, JOB_VERB_RESUME
, errp
)) {
819 if (job
->driver
->user_resume
) {
821 job
->driver
->user_resume(job
);
824 job
->user_paused
= false;
825 job_resume_locked(job
);
828 void job_user_resume(Job
*job
, Error
**errp
)
831 job_user_resume_locked(job
, errp
);
834 /* Called with job_mutex held, but releases it temporarily. */
835 static void job_do_dismiss_locked(Job
*job
)
840 job
->deferred_to_main_loop
= true;
842 job_txn_del_job_locked(job
);
844 job_state_transition_locked(job
, JOB_STATUS_NULL
);
845 job_unref_locked(job
);
848 void job_dismiss_locked(Job
**jobptr
, Error
**errp
)
851 /* similarly to _complete, this is QMP-interface only. */
853 if (job_apply_verb_locked(job
, JOB_VERB_DISMISS
, errp
)) {
857 job_do_dismiss_locked(job
);
861 void job_dismiss(Job
**jobptr
, Error
**errp
)
864 job_dismiss_locked(jobptr
, errp
);
867 void job_early_fail(Job
*job
)
870 assert(job
->status
== JOB_STATUS_CREATED
);
871 job_do_dismiss_locked(job
);
874 /* Called with job_mutex held. */
875 static void job_conclude_locked(Job
*job
)
877 job_state_transition_locked(job
, JOB_STATUS_CONCLUDED
);
878 if (job
->auto_dismiss
|| !job_started_locked(job
)) {
879 job_do_dismiss_locked(job
);
883 /* Called with job_mutex held. */
884 static void job_update_rc_locked(Job
*job
)
886 if (!job
->ret
&& job_is_cancelled_locked(job
)) {
887 job
->ret
= -ECANCELED
;
891 error_setg(&job
->err
, "%s", strerror(-job
->ret
));
893 job_state_transition_locked(job
, JOB_STATUS_ABORTING
);
897 static void job_commit(Job
*job
)
901 if (job
->driver
->commit
) {
902 job
->driver
->commit(job
);
906 static void job_abort(Job
*job
)
910 if (job
->driver
->abort
) {
911 job
->driver
->abort(job
);
915 static void job_clean(Job
*job
)
918 if (job
->driver
->clean
) {
919 job
->driver
->clean(job
);
923 /* Called with job_mutex held, but releases it temporarily */
924 static int job_finalize_single_locked(Job
*job
)
928 assert(job_is_completed_locked(job
));
930 /* Ensure abort is called for late-transactional failures */
931 job_update_rc_locked(job
);
948 job
->cb(job
->opaque
, job_ret
);
952 /* Emit events only if we actually started */
953 if (job_started_locked(job
)) {
954 if (job_is_cancelled_locked(job
)) {
955 job_event_cancelled_locked(job
);
957 job_event_completed_locked(job
);
961 job_txn_del_job_locked(job
);
962 job_conclude_locked(job
);
966 /* Called with job_mutex held, but releases it temporarily */
967 static void job_cancel_async_locked(Job
*job
, bool force
)
970 if (job
->driver
->cancel
) {
972 force
= job
->driver
->cancel(job
, force
);
975 /* No .cancel() means the job will behave as if force-cancelled */
979 if (job
->user_paused
) {
980 /* Do not call job_enter here, the caller will handle it. */
981 if (job
->driver
->user_resume
) {
983 job
->driver
->user_resume(job
);
986 job
->user_paused
= false;
987 assert(job
->pause_count
> 0);
992 * Ignore soft cancel requests after the job is already done
993 * (We will still invoke job->driver->cancel() above, but if the
994 * job driver supports soft cancelling and the job is done, that
995 * should be a no-op, too. We still call it so it can override
998 if (force
|| !job
->deferred_to_main_loop
) {
999 job
->cancelled
= true;
1000 /* To prevent 'force == false' overriding a previous 'force == true' */
1001 job
->force_cancel
|= force
;
1005 /* Called with job_mutex held, but releases it temporarily. */
1006 static void job_completed_txn_abort_locked(Job
*job
)
1009 JobTxn
*txn
= job
->txn
;
1012 if (txn
->aborting
) {
1014 * We are cancelled by another job, which will handle everything.
1018 txn
->aborting
= true;
1019 job_txn_ref_locked(txn
);
1022 * We can only hold the single job's AioContext lock while calling
1023 * job_finalize_single() because the finalization callbacks can involve
1024 * calls of AIO_WAIT_WHILE(), which could deadlock otherwise.
1025 * Note that the job's AioContext may change when it is finalized.
1027 job_ref_locked(job
);
1028 aio_context_release(job
->aio_context
);
1030 /* Other jobs are effectively cancelled by us, set the status for
1031 * them; this job, however, may or may not be cancelled, depending
1032 * on the caller, so leave it. */
1033 QLIST_FOREACH(other_job
, &txn
->jobs
, txn_list
) {
1034 if (other_job
!= job
) {
1035 ctx
= other_job
->aio_context
;
1036 aio_context_acquire(ctx
);
1038 * This is a transaction: If one job failed, no result will matter.
1039 * Therefore, pass force=true to terminate all other jobs as quickly
1042 job_cancel_async_locked(other_job
, true);
1043 aio_context_release(ctx
);
1046 while (!QLIST_EMPTY(&txn
->jobs
)) {
1047 other_job
= QLIST_FIRST(&txn
->jobs
);
1049 * The job's AioContext may change, so store it in @ctx so we
1050 * release the same context that we have acquired before.
1052 ctx
= other_job
->aio_context
;
1053 aio_context_acquire(ctx
);
1054 if (!job_is_completed_locked(other_job
)) {
1055 assert(job_cancel_requested_locked(other_job
));
1056 job_finish_sync_locked(other_job
, NULL
, NULL
);
1058 job_finalize_single_locked(other_job
);
1059 aio_context_release(ctx
);
1063 * Use job_ref()/job_unref() so we can read the AioContext here
1064 * even if the job went away during job_finalize_single().
1066 aio_context_acquire(job
->aio_context
);
1067 job_unref_locked(job
);
1069 job_txn_unref_locked(txn
);
1072 /* Called with job_mutex held, but releases it temporarily */
1073 static int job_prepare_locked(Job
*job
)
1077 GLOBAL_STATE_CODE();
1078 if (job
->ret
== 0 && job
->driver
->prepare
) {
1080 ret
= job
->driver
->prepare(job
);
1083 job_update_rc_locked(job
);
1088 /* Called with job_mutex held */
1089 static int job_needs_finalize_locked(Job
*job
)
1091 return !job
->auto_finalize
;
1094 /* Called with job_mutex held */
1095 static void job_do_finalize_locked(Job
*job
)
1098 assert(job
&& job
->txn
);
1100 /* prepare the transaction to complete */
1101 rc
= job_txn_apply_locked(job
, job_prepare_locked
);
1103 job_completed_txn_abort_locked(job
);
1105 job_txn_apply_locked(job
, job_finalize_single_locked
);
1109 void job_finalize_locked(Job
*job
, Error
**errp
)
1111 assert(job
&& job
->id
);
1112 if (job_apply_verb_locked(job
, JOB_VERB_FINALIZE
, errp
)) {
1115 job_do_finalize_locked(job
);
1118 void job_finalize(Job
*job
, Error
**errp
)
1121 job_finalize_locked(job
, errp
);
1124 /* Called with job_mutex held. */
1125 static int job_transition_to_pending_locked(Job
*job
)
1127 job_state_transition_locked(job
, JOB_STATUS_PENDING
);
1128 if (!job
->auto_finalize
) {
1129 job_event_pending_locked(job
);
1134 void job_transition_to_ready(Job
*job
)
1137 job_state_transition_locked(job
, JOB_STATUS_READY
);
1138 job_event_ready_locked(job
);
1141 /* Called with job_mutex held. */
1142 static void job_completed_txn_success_locked(Job
*job
)
1144 JobTxn
*txn
= job
->txn
;
1147 job_state_transition_locked(job
, JOB_STATUS_WAITING
);
1150 * Successful completion, see if there are other running jobs in this
1153 QLIST_FOREACH(other_job
, &txn
->jobs
, txn_list
) {
1154 if (!job_is_completed_locked(other_job
)) {
1157 assert(other_job
->ret
== 0);
1160 job_txn_apply_locked(job
, job_transition_to_pending_locked
);
1162 /* If no jobs need manual finalization, automatically do so */
1163 if (job_txn_apply_locked(job
, job_needs_finalize_locked
) == 0) {
1164 job_do_finalize_locked(job
);
1168 /* Called with job_mutex held. */
1169 static void job_completed_locked(Job
*job
)
1171 assert(job
&& job
->txn
&& !job_is_completed_locked(job
));
1173 job_update_rc_locked(job
);
1174 trace_job_completed(job
, job
->ret
);
1176 job_completed_txn_abort_locked(job
);
1178 job_completed_txn_success_locked(job
);
1183 * Useful only as a type shim for aio_bh_schedule_oneshot.
1184 * Called with job_mutex *not* held.
1186 static void job_exit(void *opaque
)
1188 Job
*job
= (Job
*)opaque
;
1192 job_ref_locked(job
);
1193 aio_context_acquire(job
->aio_context
);
1195 /* This is a lie, we're not quiescent, but still doing the completion
1196 * callbacks. However, completion callbacks tend to involve operations that
1197 * drain block nodes, and if .drained_poll still returned true, we would
1200 job_event_idle_locked(job
);
1202 job_completed_locked(job
);
1205 * Note that calling job_completed can move the job to a different
1206 * aio_context, so we cannot cache from above. job_txn_apply takes care of
1207 * acquiring the new lock, and we ref/unref to avoid job_completed freeing
1208 * the job underneath us.
1210 ctx
= job
->aio_context
;
1211 job_unref_locked(job
);
1212 aio_context_release(ctx
);
1216 * All jobs must allow a pause point before entering their job proper. This
1217 * ensures that jobs can be paused prior to being started, then resumed later.
1219 static void coroutine_fn
job_co_entry(void *opaque
)
1224 assert(job
&& job
->driver
&& job
->driver
->run
);
1225 WITH_JOB_LOCK_GUARD() {
1226 assert(job
->aio_context
== qemu_get_current_aio_context());
1227 job_pause_point_locked(job
);
1229 ret
= job
->driver
->run(job
, &job
->err
);
1230 WITH_JOB_LOCK_GUARD() {
1232 job
->deferred_to_main_loop
= true;
1235 aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit
, job
);
1238 void job_start(Job
*job
)
1240 assert(qemu_in_main_thread());
1242 WITH_JOB_LOCK_GUARD() {
1243 assert(job
&& !job_started_locked(job
) && job
->paused
&&
1244 job
->driver
&& job
->driver
->run
);
1245 job
->co
= qemu_coroutine_create(job_co_entry
, job
);
1248 job
->paused
= false;
1249 job_state_transition_locked(job
, JOB_STATUS_RUNNING
);
1251 aio_co_enter(job
->aio_context
, job
->co
);
1254 void job_cancel_locked(Job
*job
, bool force
)
1256 if (job
->status
== JOB_STATUS_CONCLUDED
) {
1257 job_do_dismiss_locked(job
);
1260 job_cancel_async_locked(job
, force
);
1261 if (!job_started_locked(job
)) {
1262 job_completed_locked(job
);
1263 } else if (job
->deferred_to_main_loop
) {
1265 * job_cancel_async() ignores soft-cancel requests for jobs
1266 * that are already done (i.e. deferred to the main loop). We
1267 * have to check again whether the job is really cancelled.
1268 * (job_cancel_requested() and job_is_cancelled() are equivalent
1269 * here, because job_cancel_async() will make soft-cancel
1270 * requests no-ops when deferred_to_main_loop is true. We
1271 * choose to call job_is_cancelled() to show that we invoke
1272 * job_completed_txn_abort() only for force-cancelled jobs.)
1274 if (job_is_cancelled_locked(job
)) {
1275 job_completed_txn_abort_locked(job
);
1278 job_enter_cond_locked(job
, NULL
);
1282 void job_cancel(Job
*job
, bool force
)
1285 job_cancel_locked(job
, force
);
1288 void job_user_cancel_locked(Job
*job
, bool force
, Error
**errp
)
1290 if (job_apply_verb_locked(job
, JOB_VERB_CANCEL
, errp
)) {
1293 job_cancel_locked(job
, force
);
1296 void job_user_cancel(Job
*job
, bool force
, Error
**errp
)
1299 job_user_cancel_locked(job
, force
, errp
);
1302 /* A wrapper around job_cancel() taking an Error ** parameter so it may be
1303 * used with job_finish_sync() without the need for (rather nasty) function
1304 * pointer casts there.
1306 * Called with job_mutex held.
1308 static void job_cancel_err_locked(Job
*job
, Error
**errp
)
1310 job_cancel_locked(job
, false);
1314 * Same as job_cancel_err(), but force-cancel.
1315 * Called with job_mutex held.
1317 static void job_force_cancel_err_locked(Job
*job
, Error
**errp
)
1319 job_cancel_locked(job
, true);
1322 int job_cancel_sync_locked(Job
*job
, bool force
)
1325 return job_finish_sync_locked(job
, &job_force_cancel_err_locked
, NULL
);
1327 return job_finish_sync_locked(job
, &job_cancel_err_locked
, NULL
);
1331 int job_cancel_sync(Job
*job
, bool force
)
1334 return job_cancel_sync_locked(job
, force
);
1337 void job_cancel_sync_all(void)
1340 AioContext
*aio_context
;
1343 while ((job
= job_next_locked(NULL
))) {
1344 aio_context
= job
->aio_context
;
1345 aio_context_acquire(aio_context
);
1346 job_cancel_sync_locked(job
, true);
1347 aio_context_release(aio_context
);
1351 int job_complete_sync_locked(Job
*job
, Error
**errp
)
1353 return job_finish_sync_locked(job
, job_complete_locked
, errp
);
1356 int job_complete_sync(Job
*job
, Error
**errp
)
1359 return job_complete_sync_locked(job
, errp
);
1362 void job_complete_locked(Job
*job
, Error
**errp
)
1364 /* Should not be reachable via external interface for internal jobs */
1366 GLOBAL_STATE_CODE();
1367 if (job_apply_verb_locked(job
, JOB_VERB_COMPLETE
, errp
)) {
1370 if (job_cancel_requested_locked(job
) || !job
->driver
->complete
) {
1371 error_setg(errp
, "The active block job '%s' cannot be completed",
1377 job
->driver
->complete(job
, errp
);
1381 void job_complete(Job
*job
, Error
**errp
)
1384 job_complete_locked(job
, errp
);
1387 int job_finish_sync_locked(Job
*job
,
1388 void (*finish
)(Job
*, Error
**errp
),
1391 Error
*local_err
= NULL
;
1393 GLOBAL_STATE_CODE();
1395 job_ref_locked(job
);
1398 finish(job
, &local_err
);
1401 error_propagate(errp
, local_err
);
1402 job_unref_locked(job
);
1407 AIO_WAIT_WHILE(job
->aio_context
,
1408 (job_enter(job
), !job_is_completed(job
)));
1411 ret
= (job_is_cancelled_locked(job
) && job
->ret
== 0)
1412 ? -ECANCELED
: job
->ret
;
1413 job_unref_locked(job
);
1417 int job_finish_sync(Job
*job
, void (*finish
)(Job
*, Error
**errp
), Error
**errp
)
1420 return job_finish_sync_locked(job
, finish
, errp
);