blockjob: rename notifier callbacks as _locked
[qemu/kevin.git] / job.c
blob3e6f61c5234b8b682d406356db944c64a26847e8
1 /*
2 * Background jobs (long-running operations)
4 * Copyright (c) 2011 IBM Corp.
5 * Copyright (c) 2012, 2018 Red Hat, Inc.
7 * Permission is hereby granted, free of charge, to any person obtaining a copy
8 * of this software and associated documentation files (the "Software"), to deal
9 * in the Software without restriction, including without limitation the rights
10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 * copies of the Software, and to permit persons to whom the Software is
12 * furnished to do so, subject to the following conditions:
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 * THE SOFTWARE.
26 #include "qemu/osdep.h"
27 #include "qapi/error.h"
28 #include "qemu/job.h"
29 #include "qemu/id.h"
30 #include "qemu/main-loop.h"
31 #include "block/aio-wait.h"
32 #include "trace/trace-root.h"
33 #include "qapi/qapi-events-job.h"
36 * The job API is composed of two categories of functions.
38 * The first includes functions used by the monitor. The monitor is
39 * peculiar in that it accesses the job list with job_get, and
40 * therefore needs consistency across job_get and the actual operation
41 * (e.g. job_user_cancel). To achieve this consistency, the caller
42 * calls job_lock/job_unlock itself around the whole operation.
45 * The second includes functions used by the job drivers and sometimes
46 * by the core block layer. These delegate the locking to the callee instead.
48 * TODO Actually make this true
52 * job_mutex protects the jobs list, but also makes the
53 * struct job fields thread-safe.
55 QemuMutex job_mutex;
57 /* Protected by job_mutex */
58 static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
60 /* Job State Transition Table */
61 bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
62 /* U, C, R, P, Y, S, W, D, X, E, N */
63 /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
64 /* C: */ [JOB_STATUS_CREATED] = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
65 /* R: */ [JOB_STATUS_RUNNING] = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
66 /* P: */ [JOB_STATUS_PAUSED] = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
67 /* Y: */ [JOB_STATUS_READY] = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
68 /* S: */ [JOB_STATUS_STANDBY] = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
69 /* W: */ [JOB_STATUS_WAITING] = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
70 /* D: */ [JOB_STATUS_PENDING] = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
71 /* X: */ [JOB_STATUS_ABORTING] = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
72 /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
73 /* N: */ [JOB_STATUS_NULL] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
76 bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
77 /* U, C, R, P, Y, S, W, D, X, E, N */
78 [JOB_VERB_CANCEL] = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
79 [JOB_VERB_PAUSE] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
80 [JOB_VERB_RESUME] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
81 [JOB_VERB_SET_SPEED] = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
82 [JOB_VERB_COMPLETE] = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
83 [JOB_VERB_FINALIZE] = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
84 [JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
87 /* Transactional group of jobs */
88 struct JobTxn {
90 /* Is this txn being cancelled? */
91 bool aborting;
93 /* List of jobs */
94 QLIST_HEAD(, Job) jobs;
96 /* Reference count */
97 int refcnt;
100 void job_lock(void)
102 /* nop */
105 void job_unlock(void)
107 /* nop */
110 static void real_job_lock(void)
112 qemu_mutex_lock(&job_mutex);
115 static void real_job_unlock(void)
117 qemu_mutex_unlock(&job_mutex);
120 static void __attribute__((__constructor__)) job_init(void)
122 qemu_mutex_init(&job_mutex);
125 JobTxn *job_txn_new(void)
127 JobTxn *txn = g_new0(JobTxn, 1);
128 QLIST_INIT(&txn->jobs);
129 txn->refcnt = 1;
130 return txn;
133 /* Called with job_mutex held. */
134 static void job_txn_ref_locked(JobTxn *txn)
136 txn->refcnt++;
139 void job_txn_unref_locked(JobTxn *txn)
141 if (txn && --txn->refcnt == 0) {
142 g_free(txn);
146 void job_txn_unref(JobTxn *txn)
148 JOB_LOCK_GUARD();
149 job_txn_unref_locked(txn);
153 * @txn: The transaction (may be NULL)
154 * @job: Job to add to the transaction
156 * Add @job to the transaction. The @job must not already be in a transaction.
157 * The caller must call either job_txn_unref() or job_completed() to release
158 * the reference that is automatically grabbed here.
160 * If @txn is NULL, the function does nothing.
162 * Called with job_mutex held.
164 static void job_txn_add_job_locked(JobTxn *txn, Job *job)
166 if (!txn) {
167 return;
170 assert(!job->txn);
171 job->txn = txn;
173 QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
174 job_txn_ref_locked(txn);
177 /* Called with job_mutex held. */
178 static void job_txn_del_job_locked(Job *job)
180 if (job->txn) {
181 QLIST_REMOVE(job, txn_list);
182 job_txn_unref_locked(job->txn);
183 job->txn = NULL;
187 /* Called with job_mutex held, but releases it temporarily. */
188 static int job_txn_apply_locked(Job *job, int fn(Job *))
190 AioContext *inner_ctx;
191 Job *other_job, *next;
192 JobTxn *txn = job->txn;
193 int rc = 0;
196 * Similar to job_completed_txn_abort, we take each job's lock before
197 * applying fn, but since we assume that outer_ctx is held by the caller,
198 * we need to release it here to avoid holding the lock twice - which would
199 * break AIO_WAIT_WHILE from within fn.
201 job_ref_locked(job);
202 aio_context_release(job->aio_context);
204 QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
205 inner_ctx = other_job->aio_context;
206 aio_context_acquire(inner_ctx);
207 rc = fn(other_job);
208 aio_context_release(inner_ctx);
209 if (rc) {
210 break;
215 * Note that job->aio_context might have been changed by calling fn, so we
216 * can't use a local variable to cache it.
218 aio_context_acquire(job->aio_context);
219 job_unref_locked(job);
220 return rc;
223 bool job_is_internal(Job *job)
225 return (job->id == NULL);
228 /* Called with job_mutex held. */
229 static void job_state_transition_locked(Job *job, JobStatus s1)
231 JobStatus s0 = job->status;
232 assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
233 trace_job_state_transition(job, job->ret,
234 JobSTT[s0][s1] ? "allowed" : "disallowed",
235 JobStatus_str(s0), JobStatus_str(s1));
236 assert(JobSTT[s0][s1]);
237 job->status = s1;
239 if (!job_is_internal(job) && s1 != s0) {
240 qapi_event_send_job_status_change(job->id, job->status);
244 int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
246 JobStatus s0 = job->status;
247 assert(verb >= 0 && verb < JOB_VERB__MAX);
248 trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
249 JobVerbTable[verb][s0] ? "allowed" : "prohibited");
250 if (JobVerbTable[verb][s0]) {
251 return 0;
253 error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
254 job->id, JobStatus_str(s0), JobVerb_str(verb));
255 return -EPERM;
258 int job_apply_verb(Job *job, JobVerb verb, Error **errp)
260 JOB_LOCK_GUARD();
261 return job_apply_verb_locked(job, verb, errp);
264 JobType job_type(const Job *job)
266 return job->driver->job_type;
269 const char *job_type_str(const Job *job)
271 return JobType_str(job_type(job));
274 bool job_is_cancelled_locked(Job *job)
276 /* force_cancel may be true only if cancelled is true, too */
277 assert(job->cancelled || !job->force_cancel);
278 return job->force_cancel;
281 bool job_is_cancelled(Job *job)
283 JOB_LOCK_GUARD();
284 return job_is_cancelled_locked(job);
287 /* Called with job_mutex held. */
288 static bool job_cancel_requested_locked(Job *job)
290 return job->cancelled;
293 bool job_cancel_requested(Job *job)
295 JOB_LOCK_GUARD();
296 return job_cancel_requested_locked(job);
299 bool job_is_ready_locked(Job *job)
301 switch (job->status) {
302 case JOB_STATUS_UNDEFINED:
303 case JOB_STATUS_CREATED:
304 case JOB_STATUS_RUNNING:
305 case JOB_STATUS_PAUSED:
306 case JOB_STATUS_WAITING:
307 case JOB_STATUS_PENDING:
308 case JOB_STATUS_ABORTING:
309 case JOB_STATUS_CONCLUDED:
310 case JOB_STATUS_NULL:
311 return false;
312 case JOB_STATUS_READY:
313 case JOB_STATUS_STANDBY:
314 return true;
315 default:
316 g_assert_not_reached();
318 return false;
321 bool job_is_ready(Job *job)
323 JOB_LOCK_GUARD();
324 return job_is_ready_locked(job);
327 bool job_is_completed_locked(Job *job)
329 switch (job->status) {
330 case JOB_STATUS_UNDEFINED:
331 case JOB_STATUS_CREATED:
332 case JOB_STATUS_RUNNING:
333 case JOB_STATUS_PAUSED:
334 case JOB_STATUS_READY:
335 case JOB_STATUS_STANDBY:
336 return false;
337 case JOB_STATUS_WAITING:
338 case JOB_STATUS_PENDING:
339 case JOB_STATUS_ABORTING:
340 case JOB_STATUS_CONCLUDED:
341 case JOB_STATUS_NULL:
342 return true;
343 default:
344 g_assert_not_reached();
346 return false;
349 bool job_is_completed(Job *job)
351 JOB_LOCK_GUARD();
352 return job_is_completed_locked(job);
355 static bool job_started_locked(Job *job)
357 return job->co;
360 /* Called with job_mutex held. */
361 static bool job_should_pause_locked(Job *job)
363 return job->pause_count > 0;
366 Job *job_next_locked(Job *job)
368 if (!job) {
369 return QLIST_FIRST(&jobs);
371 return QLIST_NEXT(job, job_list);
374 Job *job_next(Job *job)
376 JOB_LOCK_GUARD();
377 return job_next_locked(job);
380 Job *job_get_locked(const char *id)
382 Job *job;
384 QLIST_FOREACH(job, &jobs, job_list) {
385 if (job->id && !strcmp(id, job->id)) {
386 return job;
390 return NULL;
393 Job *job_get(const char *id)
395 JOB_LOCK_GUARD();
396 return job_get_locked(id);
399 void job_set_aio_context(Job *job, AioContext *ctx)
401 /* protect against read in job_finish_sync_locked and job_start */
402 GLOBAL_STATE_CODE();
403 /* protect against read in job_do_yield_locked */
404 JOB_LOCK_GUARD();
405 /* ensure the job is quiescent while the AioContext is changed */
406 assert(job->paused || job_is_completed_locked(job));
407 job->aio_context = ctx;
410 /* Called with job_mutex *not* held. */
411 static void job_sleep_timer_cb(void *opaque)
413 Job *job = opaque;
415 job_enter(job);
418 void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
419 AioContext *ctx, int flags, BlockCompletionFunc *cb,
420 void *opaque, Error **errp)
422 Job *job;
424 JOB_LOCK_GUARD();
426 if (job_id) {
427 if (flags & JOB_INTERNAL) {
428 error_setg(errp, "Cannot specify job ID for internal job");
429 return NULL;
431 if (!id_wellformed(job_id)) {
432 error_setg(errp, "Invalid job ID '%s'", job_id);
433 return NULL;
435 if (job_get_locked(job_id)) {
436 error_setg(errp, "Job ID '%s' already in use", job_id);
437 return NULL;
439 } else if (!(flags & JOB_INTERNAL)) {
440 error_setg(errp, "An explicit job ID is required");
441 return NULL;
444 job = g_malloc0(driver->instance_size);
445 job->driver = driver;
446 job->id = g_strdup(job_id);
447 job->refcnt = 1;
448 job->aio_context = ctx;
449 job->busy = false;
450 job->paused = true;
451 job->pause_count = 1;
452 job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
453 job->auto_dismiss = !(flags & JOB_MANUAL_DISMISS);
454 job->cb = cb;
455 job->opaque = opaque;
457 progress_init(&job->progress);
459 notifier_list_init(&job->on_finalize_cancelled);
460 notifier_list_init(&job->on_finalize_completed);
461 notifier_list_init(&job->on_pending);
462 notifier_list_init(&job->on_ready);
463 notifier_list_init(&job->on_idle);
465 job_state_transition_locked(job, JOB_STATUS_CREATED);
466 aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
467 QEMU_CLOCK_REALTIME, SCALE_NS,
468 job_sleep_timer_cb, job);
470 QLIST_INSERT_HEAD(&jobs, job, job_list);
472 /* Single jobs are modeled as single-job transactions for sake of
473 * consolidating the job management logic */
474 if (!txn) {
475 txn = job_txn_new();
476 job_txn_add_job_locked(txn, job);
477 job_txn_unref_locked(txn);
478 } else {
479 job_txn_add_job_locked(txn, job);
482 return job;
485 void job_ref_locked(Job *job)
487 ++job->refcnt;
490 void job_ref(Job *job)
492 JOB_LOCK_GUARD();
493 job_ref_locked(job);
496 void job_unref_locked(Job *job)
498 GLOBAL_STATE_CODE();
500 if (--job->refcnt == 0) {
501 assert(job->status == JOB_STATUS_NULL);
502 assert(!timer_pending(&job->sleep_timer));
503 assert(!job->txn);
505 if (job->driver->free) {
506 job_unlock();
507 job->driver->free(job);
508 job_lock();
511 QLIST_REMOVE(job, job_list);
513 progress_destroy(&job->progress);
514 error_free(job->err);
515 g_free(job->id);
516 g_free(job);
520 void job_unref(Job *job)
522 JOB_LOCK_GUARD();
523 job_unref_locked(job);
526 void job_progress_update(Job *job, uint64_t done)
528 progress_work_done(&job->progress, done);
531 void job_progress_set_remaining(Job *job, uint64_t remaining)
533 progress_set_remaining(&job->progress, remaining);
536 void job_progress_increase_remaining(Job *job, uint64_t delta)
538 progress_increase_remaining(&job->progress, delta);
542 * To be called when a cancelled job is finalised.
543 * Called with job_mutex held.
545 static void job_event_cancelled_locked(Job *job)
547 notifier_list_notify(&job->on_finalize_cancelled, job);
551 * To be called when a successfully completed job is finalised.
552 * Called with job_mutex held.
554 static void job_event_completed_locked(Job *job)
556 notifier_list_notify(&job->on_finalize_completed, job);
559 /* Called with job_mutex held. */
560 static void job_event_pending_locked(Job *job)
562 notifier_list_notify(&job->on_pending, job);
565 /* Called with job_mutex held. */
566 static void job_event_ready_locked(Job *job)
568 notifier_list_notify(&job->on_ready, job);
571 /* Called with job_mutex held. */
572 static void job_event_idle_locked(Job *job)
574 notifier_list_notify(&job->on_idle, job);
577 void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
579 if (!job_started_locked(job)) {
580 return;
582 if (job->deferred_to_main_loop) {
583 return;
586 real_job_lock();
587 if (job->busy) {
588 real_job_unlock();
589 return;
592 if (fn && !fn(job)) {
593 real_job_unlock();
594 return;
597 assert(!job->deferred_to_main_loop);
598 timer_del(&job->sleep_timer);
599 job->busy = true;
600 real_job_unlock();
601 job_unlock();
602 aio_co_wake(job->co);
603 job_lock();
606 void job_enter_cond(Job *job, bool(*fn)(Job *job))
608 JOB_LOCK_GUARD();
609 job_enter_cond_locked(job, fn);
612 void job_enter(Job *job)
614 JOB_LOCK_GUARD();
615 job_enter_cond_locked(job, NULL);
618 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
619 * Reentering the job coroutine with job_enter() before the timer has expired
620 * is allowed and cancels the timer.
622 * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
623 * called explicitly.
625 * Called with job_mutex held, but releases it temporarily.
627 static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
629 AioContext *next_aio_context;
631 real_job_lock();
632 if (ns != -1) {
633 timer_mod(&job->sleep_timer, ns);
635 job->busy = false;
636 job_event_idle_locked(job);
637 real_job_unlock();
638 job_unlock();
639 qemu_coroutine_yield();
640 job_lock();
642 next_aio_context = job->aio_context;
644 * Coroutine has resumed, but in the meanwhile the job AioContext
645 * might have changed via bdrv_try_set_aio_context(), so we need to move
646 * the coroutine too in the new aiocontext.
648 while (qemu_get_current_aio_context() != next_aio_context) {
649 job_unlock();
650 aio_co_reschedule_self(next_aio_context);
651 job_lock();
652 next_aio_context = job->aio_context;
655 /* Set by job_enter_cond_locked() before re-entering the coroutine. */
656 assert(job->busy);
659 /* Called with job_mutex held, but releases it temporarily. */
660 static void coroutine_fn job_pause_point_locked(Job *job)
662 assert(job && job_started_locked(job));
664 if (!job_should_pause_locked(job)) {
665 return;
667 if (job_is_cancelled_locked(job)) {
668 return;
671 if (job->driver->pause) {
672 job_unlock();
673 job->driver->pause(job);
674 job_lock();
677 if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
678 JobStatus status = job->status;
679 job_state_transition_locked(job, status == JOB_STATUS_READY
680 ? JOB_STATUS_STANDBY
681 : JOB_STATUS_PAUSED);
682 job->paused = true;
683 job_do_yield_locked(job, -1);
684 job->paused = false;
685 job_state_transition_locked(job, status);
688 if (job->driver->resume) {
689 job_unlock();
690 job->driver->resume(job);
691 job_lock();
695 void coroutine_fn job_pause_point(Job *job)
697 JOB_LOCK_GUARD();
698 job_pause_point_locked(job);
701 static void coroutine_fn job_yield_locked(Job *job)
703 assert(job->busy);
705 /* Check cancellation *before* setting busy = false, too! */
706 if (job_is_cancelled_locked(job)) {
707 return;
710 if (!job_should_pause_locked(job)) {
711 job_do_yield_locked(job, -1);
714 job_pause_point_locked(job);
717 void coroutine_fn job_yield(Job *job)
719 JOB_LOCK_GUARD();
720 job_yield_locked(job);
723 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
725 JOB_LOCK_GUARD();
726 assert(job->busy);
728 /* Check cancellation *before* setting busy = false, too! */
729 if (job_is_cancelled_locked(job)) {
730 return;
733 if (!job_should_pause_locked(job)) {
734 job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
737 job_pause_point_locked(job);
740 /* Assumes the job_mutex is held */
741 static bool job_timer_not_pending_locked(Job *job)
743 return !timer_pending(&job->sleep_timer);
746 void job_pause_locked(Job *job)
748 job->pause_count++;
749 if (!job->paused) {
750 job_enter_cond_locked(job, NULL);
754 void job_pause(Job *job)
756 JOB_LOCK_GUARD();
757 job_pause_locked(job);
760 void job_resume_locked(Job *job)
762 assert(job->pause_count > 0);
763 job->pause_count--;
764 if (job->pause_count) {
765 return;
768 /* kick only if no timer is pending */
769 job_enter_cond_locked(job, job_timer_not_pending_locked);
772 void job_resume(Job *job)
774 JOB_LOCK_GUARD();
775 job_resume_locked(job);
778 void job_user_pause_locked(Job *job, Error **errp)
780 if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
781 return;
783 if (job->user_paused) {
784 error_setg(errp, "Job is already paused");
785 return;
787 job->user_paused = true;
788 job_pause_locked(job);
791 void job_user_pause(Job *job, Error **errp)
793 JOB_LOCK_GUARD();
794 job_user_pause_locked(job, errp);
797 bool job_user_paused_locked(Job *job)
799 return job->user_paused;
802 bool job_user_paused(Job *job)
804 JOB_LOCK_GUARD();
805 return job_user_paused_locked(job);
808 void job_user_resume_locked(Job *job, Error **errp)
810 assert(job);
811 GLOBAL_STATE_CODE();
812 if (!job->user_paused || job->pause_count <= 0) {
813 error_setg(errp, "Can't resume a job that was not paused");
814 return;
816 if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
817 return;
819 if (job->driver->user_resume) {
820 job_unlock();
821 job->driver->user_resume(job);
822 job_lock();
824 job->user_paused = false;
825 job_resume_locked(job);
828 void job_user_resume(Job *job, Error **errp)
830 JOB_LOCK_GUARD();
831 job_user_resume_locked(job, errp);
834 /* Called with job_mutex held, but releases it temporarily. */
835 static void job_do_dismiss_locked(Job *job)
837 assert(job);
838 job->busy = false;
839 job->paused = false;
840 job->deferred_to_main_loop = true;
842 job_txn_del_job_locked(job);
844 job_state_transition_locked(job, JOB_STATUS_NULL);
845 job_unref_locked(job);
848 void job_dismiss_locked(Job **jobptr, Error **errp)
850 Job *job = *jobptr;
851 /* similarly to _complete, this is QMP-interface only. */
852 assert(job->id);
853 if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
854 return;
857 job_do_dismiss_locked(job);
858 *jobptr = NULL;
861 void job_dismiss(Job **jobptr, Error **errp)
863 JOB_LOCK_GUARD();
864 job_dismiss_locked(jobptr, errp);
867 void job_early_fail(Job *job)
869 JOB_LOCK_GUARD();
870 assert(job->status == JOB_STATUS_CREATED);
871 job_do_dismiss_locked(job);
874 /* Called with job_mutex held. */
875 static void job_conclude_locked(Job *job)
877 job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
878 if (job->auto_dismiss || !job_started_locked(job)) {
879 job_do_dismiss_locked(job);
883 /* Called with job_mutex held. */
884 static void job_update_rc_locked(Job *job)
886 if (!job->ret && job_is_cancelled_locked(job)) {
887 job->ret = -ECANCELED;
889 if (job->ret) {
890 if (!job->err) {
891 error_setg(&job->err, "%s", strerror(-job->ret));
893 job_state_transition_locked(job, JOB_STATUS_ABORTING);
897 static void job_commit(Job *job)
899 assert(!job->ret);
900 GLOBAL_STATE_CODE();
901 if (job->driver->commit) {
902 job->driver->commit(job);
906 static void job_abort(Job *job)
908 assert(job->ret);
909 GLOBAL_STATE_CODE();
910 if (job->driver->abort) {
911 job->driver->abort(job);
915 static void job_clean(Job *job)
917 GLOBAL_STATE_CODE();
918 if (job->driver->clean) {
919 job->driver->clean(job);
923 /* Called with job_mutex held, but releases it temporarily */
924 static int job_finalize_single_locked(Job *job)
926 int job_ret;
928 assert(job_is_completed_locked(job));
930 /* Ensure abort is called for late-transactional failures */
931 job_update_rc_locked(job);
933 job_ret = job->ret;
934 job_unlock();
936 if (!job_ret) {
937 job_commit(job);
938 } else {
939 job_abort(job);
941 job_clean(job);
943 job_lock();
945 if (job->cb) {
946 job_ret = job->ret;
947 job_unlock();
948 job->cb(job->opaque, job_ret);
949 job_lock();
952 /* Emit events only if we actually started */
953 if (job_started_locked(job)) {
954 if (job_is_cancelled_locked(job)) {
955 job_event_cancelled_locked(job);
956 } else {
957 job_event_completed_locked(job);
961 job_txn_del_job_locked(job);
962 job_conclude_locked(job);
963 return 0;
966 /* Called with job_mutex held, but releases it temporarily */
967 static void job_cancel_async_locked(Job *job, bool force)
969 GLOBAL_STATE_CODE();
970 if (job->driver->cancel) {
971 job_unlock();
972 force = job->driver->cancel(job, force);
973 job_lock();
974 } else {
975 /* No .cancel() means the job will behave as if force-cancelled */
976 force = true;
979 if (job->user_paused) {
980 /* Do not call job_enter here, the caller will handle it. */
981 if (job->driver->user_resume) {
982 job_unlock();
983 job->driver->user_resume(job);
984 job_lock();
986 job->user_paused = false;
987 assert(job->pause_count > 0);
988 job->pause_count--;
992 * Ignore soft cancel requests after the job is already done
993 * (We will still invoke job->driver->cancel() above, but if the
994 * job driver supports soft cancelling and the job is done, that
995 * should be a no-op, too. We still call it so it can override
996 * @force.)
998 if (force || !job->deferred_to_main_loop) {
999 job->cancelled = true;
1000 /* To prevent 'force == false' overriding a previous 'force == true' */
1001 job->force_cancel |= force;
1005 /* Called with job_mutex held, but releases it temporarily. */
1006 static void job_completed_txn_abort_locked(Job *job)
1008 AioContext *ctx;
1009 JobTxn *txn = job->txn;
1010 Job *other_job;
1012 if (txn->aborting) {
1014 * We are cancelled by another job, which will handle everything.
1016 return;
1018 txn->aborting = true;
1019 job_txn_ref_locked(txn);
1022 * We can only hold the single job's AioContext lock while calling
1023 * job_finalize_single() because the finalization callbacks can involve
1024 * calls of AIO_WAIT_WHILE(), which could deadlock otherwise.
1025 * Note that the job's AioContext may change when it is finalized.
1027 job_ref_locked(job);
1028 aio_context_release(job->aio_context);
1030 /* Other jobs are effectively cancelled by us, set the status for
1031 * them; this job, however, may or may not be cancelled, depending
1032 * on the caller, so leave it. */
1033 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1034 if (other_job != job) {
1035 ctx = other_job->aio_context;
1036 aio_context_acquire(ctx);
1038 * This is a transaction: If one job failed, no result will matter.
1039 * Therefore, pass force=true to terminate all other jobs as quickly
1040 * as possible.
1042 job_cancel_async_locked(other_job, true);
1043 aio_context_release(ctx);
1046 while (!QLIST_EMPTY(&txn->jobs)) {
1047 other_job = QLIST_FIRST(&txn->jobs);
1049 * The job's AioContext may change, so store it in @ctx so we
1050 * release the same context that we have acquired before.
1052 ctx = other_job->aio_context;
1053 aio_context_acquire(ctx);
1054 if (!job_is_completed_locked(other_job)) {
1055 assert(job_cancel_requested_locked(other_job));
1056 job_finish_sync_locked(other_job, NULL, NULL);
1058 job_finalize_single_locked(other_job);
1059 aio_context_release(ctx);
1063 * Use job_ref()/job_unref() so we can read the AioContext here
1064 * even if the job went away during job_finalize_single().
1066 aio_context_acquire(job->aio_context);
1067 job_unref_locked(job);
1069 job_txn_unref_locked(txn);
1072 /* Called with job_mutex held, but releases it temporarily */
1073 static int job_prepare_locked(Job *job)
1075 int ret;
1077 GLOBAL_STATE_CODE();
1078 if (job->ret == 0 && job->driver->prepare) {
1079 job_unlock();
1080 ret = job->driver->prepare(job);
1081 job_lock();
1082 job->ret = ret;
1083 job_update_rc_locked(job);
1085 return job->ret;
1088 /* Called with job_mutex held */
1089 static int job_needs_finalize_locked(Job *job)
1091 return !job->auto_finalize;
1094 /* Called with job_mutex held */
1095 static void job_do_finalize_locked(Job *job)
1097 int rc;
1098 assert(job && job->txn);
1100 /* prepare the transaction to complete */
1101 rc = job_txn_apply_locked(job, job_prepare_locked);
1102 if (rc) {
1103 job_completed_txn_abort_locked(job);
1104 } else {
1105 job_txn_apply_locked(job, job_finalize_single_locked);
1109 void job_finalize_locked(Job *job, Error **errp)
1111 assert(job && job->id);
1112 if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1113 return;
1115 job_do_finalize_locked(job);
1118 void job_finalize(Job *job, Error **errp)
1120 JOB_LOCK_GUARD();
1121 job_finalize_locked(job, errp);
1124 /* Called with job_mutex held. */
1125 static int job_transition_to_pending_locked(Job *job)
1127 job_state_transition_locked(job, JOB_STATUS_PENDING);
1128 if (!job->auto_finalize) {
1129 job_event_pending_locked(job);
1131 return 0;
1134 void job_transition_to_ready(Job *job)
1136 JOB_LOCK_GUARD();
1137 job_state_transition_locked(job, JOB_STATUS_READY);
1138 job_event_ready_locked(job);
1141 /* Called with job_mutex held. */
1142 static void job_completed_txn_success_locked(Job *job)
1144 JobTxn *txn = job->txn;
1145 Job *other_job;
1147 job_state_transition_locked(job, JOB_STATUS_WAITING);
1150 * Successful completion, see if there are other running jobs in this
1151 * txn.
1153 QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1154 if (!job_is_completed_locked(other_job)) {
1155 return;
1157 assert(other_job->ret == 0);
1160 job_txn_apply_locked(job, job_transition_to_pending_locked);
1162 /* If no jobs need manual finalization, automatically do so */
1163 if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1164 job_do_finalize_locked(job);
1168 /* Called with job_mutex held. */
1169 static void job_completed_locked(Job *job)
1171 assert(job && job->txn && !job_is_completed_locked(job));
1173 job_update_rc_locked(job);
1174 trace_job_completed(job, job->ret);
1175 if (job->ret) {
1176 job_completed_txn_abort_locked(job);
1177 } else {
1178 job_completed_txn_success_locked(job);
1183 * Useful only as a type shim for aio_bh_schedule_oneshot.
1184 * Called with job_mutex *not* held.
1186 static void job_exit(void *opaque)
1188 Job *job = (Job *)opaque;
1189 AioContext *ctx;
1190 JOB_LOCK_GUARD();
1192 job_ref_locked(job);
1193 aio_context_acquire(job->aio_context);
1195 /* This is a lie, we're not quiescent, but still doing the completion
1196 * callbacks. However, completion callbacks tend to involve operations that
1197 * drain block nodes, and if .drained_poll still returned true, we would
1198 * deadlock. */
1199 job->busy = false;
1200 job_event_idle_locked(job);
1202 job_completed_locked(job);
1205 * Note that calling job_completed can move the job to a different
1206 * aio_context, so we cannot cache from above. job_txn_apply takes care of
1207 * acquiring the new lock, and we ref/unref to avoid job_completed freeing
1208 * the job underneath us.
1210 ctx = job->aio_context;
1211 job_unref_locked(job);
1212 aio_context_release(ctx);
1216 * All jobs must allow a pause point before entering their job proper. This
1217 * ensures that jobs can be paused prior to being started, then resumed later.
1219 static void coroutine_fn job_co_entry(void *opaque)
1221 Job *job = opaque;
1222 int ret;
1224 assert(job && job->driver && job->driver->run);
1225 WITH_JOB_LOCK_GUARD() {
1226 assert(job->aio_context == qemu_get_current_aio_context());
1227 job_pause_point_locked(job);
1229 ret = job->driver->run(job, &job->err);
1230 WITH_JOB_LOCK_GUARD() {
1231 job->ret = ret;
1232 job->deferred_to_main_loop = true;
1233 job->busy = true;
1235 aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1238 void job_start(Job *job)
1240 assert(qemu_in_main_thread());
1242 WITH_JOB_LOCK_GUARD() {
1243 assert(job && !job_started_locked(job) && job->paused &&
1244 job->driver && job->driver->run);
1245 job->co = qemu_coroutine_create(job_co_entry, job);
1246 job->pause_count--;
1247 job->busy = true;
1248 job->paused = false;
1249 job_state_transition_locked(job, JOB_STATUS_RUNNING);
1251 aio_co_enter(job->aio_context, job->co);
1254 void job_cancel_locked(Job *job, bool force)
1256 if (job->status == JOB_STATUS_CONCLUDED) {
1257 job_do_dismiss_locked(job);
1258 return;
1260 job_cancel_async_locked(job, force);
1261 if (!job_started_locked(job)) {
1262 job_completed_locked(job);
1263 } else if (job->deferred_to_main_loop) {
1265 * job_cancel_async() ignores soft-cancel requests for jobs
1266 * that are already done (i.e. deferred to the main loop). We
1267 * have to check again whether the job is really cancelled.
1268 * (job_cancel_requested() and job_is_cancelled() are equivalent
1269 * here, because job_cancel_async() will make soft-cancel
1270 * requests no-ops when deferred_to_main_loop is true. We
1271 * choose to call job_is_cancelled() to show that we invoke
1272 * job_completed_txn_abort() only for force-cancelled jobs.)
1274 if (job_is_cancelled_locked(job)) {
1275 job_completed_txn_abort_locked(job);
1277 } else {
1278 job_enter_cond_locked(job, NULL);
1282 void job_cancel(Job *job, bool force)
1284 JOB_LOCK_GUARD();
1285 job_cancel_locked(job, force);
1288 void job_user_cancel_locked(Job *job, bool force, Error **errp)
1290 if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1291 return;
1293 job_cancel_locked(job, force);
1296 void job_user_cancel(Job *job, bool force, Error **errp)
1298 JOB_LOCK_GUARD();
1299 job_user_cancel_locked(job, force, errp);
1302 /* A wrapper around job_cancel() taking an Error ** parameter so it may be
1303 * used with job_finish_sync() without the need for (rather nasty) function
1304 * pointer casts there.
1306 * Called with job_mutex held.
1308 static void job_cancel_err_locked(Job *job, Error **errp)
1310 job_cancel_locked(job, false);
1314 * Same as job_cancel_err(), but force-cancel.
1315 * Called with job_mutex held.
1317 static void job_force_cancel_err_locked(Job *job, Error **errp)
1319 job_cancel_locked(job, true);
1322 int job_cancel_sync_locked(Job *job, bool force)
1324 if (force) {
1325 return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1326 } else {
1327 return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1331 int job_cancel_sync(Job *job, bool force)
1333 JOB_LOCK_GUARD();
1334 return job_cancel_sync_locked(job, force);
1337 void job_cancel_sync_all(void)
1339 Job *job;
1340 AioContext *aio_context;
1341 JOB_LOCK_GUARD();
1343 while ((job = job_next_locked(NULL))) {
1344 aio_context = job->aio_context;
1345 aio_context_acquire(aio_context);
1346 job_cancel_sync_locked(job, true);
1347 aio_context_release(aio_context);
1351 int job_complete_sync_locked(Job *job, Error **errp)
1353 return job_finish_sync_locked(job, job_complete_locked, errp);
1356 int job_complete_sync(Job *job, Error **errp)
1358 JOB_LOCK_GUARD();
1359 return job_complete_sync_locked(job, errp);
1362 void job_complete_locked(Job *job, Error **errp)
1364 /* Should not be reachable via external interface for internal jobs */
1365 assert(job->id);
1366 GLOBAL_STATE_CODE();
1367 if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1368 return;
1370 if (job_cancel_requested_locked(job) || !job->driver->complete) {
1371 error_setg(errp, "The active block job '%s' cannot be completed",
1372 job->id);
1373 return;
1376 job_unlock();
1377 job->driver->complete(job, errp);
1378 job_lock();
1381 void job_complete(Job *job, Error **errp)
1383 JOB_LOCK_GUARD();
1384 job_complete_locked(job, errp);
1387 int job_finish_sync_locked(Job *job,
1388 void (*finish)(Job *, Error **errp),
1389 Error **errp)
1391 Error *local_err = NULL;
1392 int ret;
1393 GLOBAL_STATE_CODE();
1395 job_ref_locked(job);
1397 if (finish) {
1398 finish(job, &local_err);
1400 if (local_err) {
1401 error_propagate(errp, local_err);
1402 job_unref_locked(job);
1403 return -EBUSY;
1406 job_unlock();
1407 AIO_WAIT_WHILE(job->aio_context,
1408 (job_enter(job), !job_is_completed(job)));
1409 job_lock();
1411 ret = (job_is_cancelled_locked(job) && job->ret == 0)
1412 ? -ECANCELED : job->ret;
1413 job_unref_locked(job);
1414 return ret;
1417 int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
1419 JOB_LOCK_GUARD();
1420 return job_finish_sync_locked(job, finish, errp);