OSDN Git Service

job.c: make job_mutex and job_lock/unlock() public
[qmiga/qemu.git] / job.c
1 /*
2  * Background jobs (long-running operations)
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012, 2018 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25
26 #include "qemu/osdep.h"
27 #include "qapi/error.h"
28 #include "qemu/job.h"
29 #include "qemu/id.h"
30 #include "qemu/main-loop.h"
31 #include "block/aio-wait.h"
32 #include "trace/trace-root.h"
33 #include "qapi/qapi-events-job.h"
34
35 /*
36  * job_mutex protects the jobs list, but also makes the
37  * struct job fields thread-safe.
38  */
39 QemuMutex job_mutex;
40
41 static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
42
43 /* Job State Transition Table */
44 bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
45                                     /* U, C, R, P, Y, S, W, D, X, E, N */
46     /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
47     /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
48     /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
49     /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
50     /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
51     /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
52     /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
53     /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
54     /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
55     /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
56     /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
57 };
58
59 bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
60                                     /* U, C, R, P, Y, S, W, D, X, E, N */
61     [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
62     [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
63     [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
64     [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
65     [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
66     [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
67     [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
68 };
69
70 /* Transactional group of jobs */
71 struct JobTxn {
72
73     /* Is this txn being cancelled? */
74     bool aborting;
75
76     /* List of jobs */
77     QLIST_HEAD(, Job) jobs;
78
79     /* Reference count */
80     int refcnt;
81 };
82
83 void job_lock(void)
84 {
85     /* nop */
86 }
87
88 void job_unlock(void)
89 {
90     /* nop */
91 }
92
93 static void real_job_lock(void)
94 {
95     qemu_mutex_lock(&job_mutex);
96 }
97
98 static void real_job_unlock(void)
99 {
100     qemu_mutex_unlock(&job_mutex);
101 }
102
103 static void __attribute__((__constructor__)) job_init(void)
104 {
105     qemu_mutex_init(&job_mutex);
106 }
107
108 JobTxn *job_txn_new(void)
109 {
110     JobTxn *txn = g_new0(JobTxn, 1);
111     QLIST_INIT(&txn->jobs);
112     txn->refcnt = 1;
113     return txn;
114 }
115
116 static void job_txn_ref(JobTxn *txn)
117 {
118     txn->refcnt++;
119 }
120
121 void job_txn_unref(JobTxn *txn)
122 {
123     if (txn && --txn->refcnt == 0) {
124         g_free(txn);
125     }
126 }
127
128 void job_txn_add_job(JobTxn *txn, Job *job)
129 {
130     if (!txn) {
131         return;
132     }
133
134     assert(!job->txn);
135     job->txn = txn;
136
137     QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
138     job_txn_ref(txn);
139 }
140
141 static void job_txn_del_job(Job *job)
142 {
143     if (job->txn) {
144         QLIST_REMOVE(job, txn_list);
145         job_txn_unref(job->txn);
146         job->txn = NULL;
147     }
148 }
149
150 static int job_txn_apply(Job *job, int fn(Job *))
151 {
152     AioContext *inner_ctx;
153     Job *other_job, *next;
154     JobTxn *txn = job->txn;
155     int rc = 0;
156
157     /*
158      * Similar to job_completed_txn_abort, we take each job's lock before
159      * applying fn, but since we assume that outer_ctx is held by the caller,
160      * we need to release it here to avoid holding the lock twice - which would
161      * break AIO_WAIT_WHILE from within fn.
162      */
163     job_ref(job);
164     aio_context_release(job->aio_context);
165
166     QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
167         inner_ctx = other_job->aio_context;
168         aio_context_acquire(inner_ctx);
169         rc = fn(other_job);
170         aio_context_release(inner_ctx);
171         if (rc) {
172             break;
173         }
174     }
175
176     /*
177      * Note that job->aio_context might have been changed by calling fn, so we
178      * can't use a local variable to cache it.
179      */
180     aio_context_acquire(job->aio_context);
181     job_unref(job);
182     return rc;
183 }
184
185 bool job_is_internal(Job *job)
186 {
187     return (job->id == NULL);
188 }
189
190 static void job_state_transition(Job *job, JobStatus s1)
191 {
192     JobStatus s0 = job->status;
193     assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
194     trace_job_state_transition(job, job->ret,
195                                JobSTT[s0][s1] ? "allowed" : "disallowed",
196                                JobStatus_str(s0), JobStatus_str(s1));
197     assert(JobSTT[s0][s1]);
198     job->status = s1;
199
200     if (!job_is_internal(job) && s1 != s0) {
201         qapi_event_send_job_status_change(job->id, job->status);
202     }
203 }
204
205 int job_apply_verb(Job *job, JobVerb verb, Error **errp)
206 {
207     JobStatus s0 = job->status;
208     assert(verb >= 0 && verb < JOB_VERB__MAX);
209     trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
210                          JobVerbTable[verb][s0] ? "allowed" : "prohibited");
211     if (JobVerbTable[verb][s0]) {
212         return 0;
213     }
214     error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
215                job->id, JobStatus_str(s0), JobVerb_str(verb));
216     return -EPERM;
217 }
218
219 JobType job_type(const Job *job)
220 {
221     return job->driver->job_type;
222 }
223
224 const char *job_type_str(const Job *job)
225 {
226     return JobType_str(job_type(job));
227 }
228
229 bool job_is_cancelled(Job *job)
230 {
231     /* force_cancel may be true only if cancelled is true, too */
232     assert(job->cancelled || !job->force_cancel);
233     return job->force_cancel;
234 }
235
236 bool job_cancel_requested(Job *job)
237 {
238     return job->cancelled;
239 }
240
241 bool job_is_ready(Job *job)
242 {
243     switch (job->status) {
244     case JOB_STATUS_UNDEFINED:
245     case JOB_STATUS_CREATED:
246     case JOB_STATUS_RUNNING:
247     case JOB_STATUS_PAUSED:
248     case JOB_STATUS_WAITING:
249     case JOB_STATUS_PENDING:
250     case JOB_STATUS_ABORTING:
251     case JOB_STATUS_CONCLUDED:
252     case JOB_STATUS_NULL:
253         return false;
254     case JOB_STATUS_READY:
255     case JOB_STATUS_STANDBY:
256         return true;
257     default:
258         g_assert_not_reached();
259     }
260     return false;
261 }
262
263 bool job_is_completed(Job *job)
264 {
265     switch (job->status) {
266     case JOB_STATUS_UNDEFINED:
267     case JOB_STATUS_CREATED:
268     case JOB_STATUS_RUNNING:
269     case JOB_STATUS_PAUSED:
270     case JOB_STATUS_READY:
271     case JOB_STATUS_STANDBY:
272         return false;
273     case JOB_STATUS_WAITING:
274     case JOB_STATUS_PENDING:
275     case JOB_STATUS_ABORTING:
276     case JOB_STATUS_CONCLUDED:
277     case JOB_STATUS_NULL:
278         return true;
279     default:
280         g_assert_not_reached();
281     }
282     return false;
283 }
284
285 static bool job_started(Job *job)
286 {
287     return job->co;
288 }
289
290 static bool job_should_pause(Job *job)
291 {
292     return job->pause_count > 0;
293 }
294
295 Job *job_next(Job *job)
296 {
297     if (!job) {
298         return QLIST_FIRST(&jobs);
299     }
300     return QLIST_NEXT(job, job_list);
301 }
302
303 Job *job_get(const char *id)
304 {
305     Job *job;
306
307     QLIST_FOREACH(job, &jobs, job_list) {
308         if (job->id && !strcmp(id, job->id)) {
309             return job;
310         }
311     }
312
313     return NULL;
314 }
315
316 static void job_sleep_timer_cb(void *opaque)
317 {
318     Job *job = opaque;
319
320     job_enter(job);
321 }
322
323 void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
324                  AioContext *ctx, int flags, BlockCompletionFunc *cb,
325                  void *opaque, Error **errp)
326 {
327     Job *job;
328
329     if (job_id) {
330         if (flags & JOB_INTERNAL) {
331             error_setg(errp, "Cannot specify job ID for internal job");
332             return NULL;
333         }
334         if (!id_wellformed(job_id)) {
335             error_setg(errp, "Invalid job ID '%s'", job_id);
336             return NULL;
337         }
338         if (job_get(job_id)) {
339             error_setg(errp, "Job ID '%s' already in use", job_id);
340             return NULL;
341         }
342     } else if (!(flags & JOB_INTERNAL)) {
343         error_setg(errp, "An explicit job ID is required");
344         return NULL;
345     }
346
347     job = g_malloc0(driver->instance_size);
348     job->driver        = driver;
349     job->id            = g_strdup(job_id);
350     job->refcnt        = 1;
351     job->aio_context   = ctx;
352     job->busy          = false;
353     job->paused        = true;
354     job->pause_count   = 1;
355     job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
356     job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
357     job->cb            = cb;
358     job->opaque        = opaque;
359
360     progress_init(&job->progress);
361
362     notifier_list_init(&job->on_finalize_cancelled);
363     notifier_list_init(&job->on_finalize_completed);
364     notifier_list_init(&job->on_pending);
365     notifier_list_init(&job->on_ready);
366     notifier_list_init(&job->on_idle);
367
368     job_state_transition(job, JOB_STATUS_CREATED);
369     aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
370                    QEMU_CLOCK_REALTIME, SCALE_NS,
371                    job_sleep_timer_cb, job);
372
373     QLIST_INSERT_HEAD(&jobs, job, job_list);
374
375     /* Single jobs are modeled as single-job transactions for sake of
376      * consolidating the job management logic */
377     if (!txn) {
378         txn = job_txn_new();
379         job_txn_add_job(txn, job);
380         job_txn_unref(txn);
381     } else {
382         job_txn_add_job(txn, job);
383     }
384
385     return job;
386 }
387
388 void job_ref(Job *job)
389 {
390     ++job->refcnt;
391 }
392
393 void job_unref(Job *job)
394 {
395     GLOBAL_STATE_CODE();
396
397     if (--job->refcnt == 0) {
398         assert(job->status == JOB_STATUS_NULL);
399         assert(!timer_pending(&job->sleep_timer));
400         assert(!job->txn);
401
402         if (job->driver->free) {
403             job->driver->free(job);
404         }
405
406         QLIST_REMOVE(job, job_list);
407
408         progress_destroy(&job->progress);
409         error_free(job->err);
410         g_free(job->id);
411         g_free(job);
412     }
413 }
414
415 void job_progress_update(Job *job, uint64_t done)
416 {
417     progress_work_done(&job->progress, done);
418 }
419
420 void job_progress_set_remaining(Job *job, uint64_t remaining)
421 {
422     progress_set_remaining(&job->progress, remaining);
423 }
424
425 void job_progress_increase_remaining(Job *job, uint64_t delta)
426 {
427     progress_increase_remaining(&job->progress, delta);
428 }
429
430 void job_event_cancelled(Job *job)
431 {
432     notifier_list_notify(&job->on_finalize_cancelled, job);
433 }
434
435 void job_event_completed(Job *job)
436 {
437     notifier_list_notify(&job->on_finalize_completed, job);
438 }
439
440 static void job_event_pending(Job *job)
441 {
442     notifier_list_notify(&job->on_pending, job);
443 }
444
445 static void job_event_ready(Job *job)
446 {
447     notifier_list_notify(&job->on_ready, job);
448 }
449
450 static void job_event_idle(Job *job)
451 {
452     notifier_list_notify(&job->on_idle, job);
453 }
454
455 void job_enter_cond(Job *job, bool(*fn)(Job *job))
456 {
457     if (!job_started(job)) {
458         return;
459     }
460     if (job->deferred_to_main_loop) {
461         return;
462     }
463
464     real_job_lock();
465     if (job->busy) {
466         real_job_unlock();
467         return;
468     }
469
470     if (fn && !fn(job)) {
471         real_job_unlock();
472         return;
473     }
474
475     assert(!job->deferred_to_main_loop);
476     timer_del(&job->sleep_timer);
477     job->busy = true;
478     real_job_unlock();
479     aio_co_enter(job->aio_context, job->co);
480 }
481
482 void job_enter(Job *job)
483 {
484     job_enter_cond(job, NULL);
485 }
486
487 /* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
488  * Reentering the job coroutine with job_enter() before the timer has expired
489  * is allowed and cancels the timer.
490  *
491  * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
492  * called explicitly. */
493 static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
494 {
495     real_job_lock();
496     if (ns != -1) {
497         timer_mod(&job->sleep_timer, ns);
498     }
499     job->busy = false;
500     job_event_idle(job);
501     real_job_unlock();
502     qemu_coroutine_yield();
503
504     /* Set by job_enter_cond() before re-entering the coroutine.  */
505     assert(job->busy);
506 }
507
508 void coroutine_fn job_pause_point(Job *job)
509 {
510     assert(job && job_started(job));
511
512     if (!job_should_pause(job)) {
513         return;
514     }
515     if (job_is_cancelled(job)) {
516         return;
517     }
518
519     if (job->driver->pause) {
520         job->driver->pause(job);
521     }
522
523     if (job_should_pause(job) && !job_is_cancelled(job)) {
524         JobStatus status = job->status;
525         job_state_transition(job, status == JOB_STATUS_READY
526                                   ? JOB_STATUS_STANDBY
527                                   : JOB_STATUS_PAUSED);
528         job->paused = true;
529         job_do_yield(job, -1);
530         job->paused = false;
531         job_state_transition(job, status);
532     }
533
534     if (job->driver->resume) {
535         job->driver->resume(job);
536     }
537 }
538
539 void coroutine_fn job_yield(Job *job)
540 {
541     assert(job->busy);
542
543     /* Check cancellation *before* setting busy = false, too!  */
544     if (job_is_cancelled(job)) {
545         return;
546     }
547
548     if (!job_should_pause(job)) {
549         job_do_yield(job, -1);
550     }
551
552     job_pause_point(job);
553 }
554
555 void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
556 {
557     assert(job->busy);
558
559     /* Check cancellation *before* setting busy = false, too!  */
560     if (job_is_cancelled(job)) {
561         return;
562     }
563
564     if (!job_should_pause(job)) {
565         job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
566     }
567
568     job_pause_point(job);
569 }
570
571 /* Assumes the block_job_mutex is held */
572 static bool job_timer_not_pending(Job *job)
573 {
574     return !timer_pending(&job->sleep_timer);
575 }
576
577 void job_pause(Job *job)
578 {
579     job->pause_count++;
580     if (!job->paused) {
581         job_enter(job);
582     }
583 }
584
585 void job_resume(Job *job)
586 {
587     assert(job->pause_count > 0);
588     job->pause_count--;
589     if (job->pause_count) {
590         return;
591     }
592
593     /* kick only if no timer is pending */
594     job_enter_cond(job, job_timer_not_pending);
595 }
596
597 void job_user_pause(Job *job, Error **errp)
598 {
599     if (job_apply_verb(job, JOB_VERB_PAUSE, errp)) {
600         return;
601     }
602     if (job->user_paused) {
603         error_setg(errp, "Job is already paused");
604         return;
605     }
606     job->user_paused = true;
607     job_pause(job);
608 }
609
610 bool job_user_paused(Job *job)
611 {
612     return job->user_paused;
613 }
614
615 void job_user_resume(Job *job, Error **errp)
616 {
617     assert(job);
618     GLOBAL_STATE_CODE();
619     if (!job->user_paused || job->pause_count <= 0) {
620         error_setg(errp, "Can't resume a job that was not paused");
621         return;
622     }
623     if (job_apply_verb(job, JOB_VERB_RESUME, errp)) {
624         return;
625     }
626     if (job->driver->user_resume) {
627         job->driver->user_resume(job);
628     }
629     job->user_paused = false;
630     job_resume(job);
631 }
632
633 static void job_do_dismiss(Job *job)
634 {
635     assert(job);
636     job->busy = false;
637     job->paused = false;
638     job->deferred_to_main_loop = true;
639
640     job_txn_del_job(job);
641
642     job_state_transition(job, JOB_STATUS_NULL);
643     job_unref(job);
644 }
645
646 void job_dismiss(Job **jobptr, Error **errp)
647 {
648     Job *job = *jobptr;
649     /* similarly to _complete, this is QMP-interface only. */
650     assert(job->id);
651     if (job_apply_verb(job, JOB_VERB_DISMISS, errp)) {
652         return;
653     }
654
655     job_do_dismiss(job);
656     *jobptr = NULL;
657 }
658
659 void job_early_fail(Job *job)
660 {
661     assert(job->status == JOB_STATUS_CREATED);
662     job_do_dismiss(job);
663 }
664
665 static void job_conclude(Job *job)
666 {
667     job_state_transition(job, JOB_STATUS_CONCLUDED);
668     if (job->auto_dismiss || !job_started(job)) {
669         job_do_dismiss(job);
670     }
671 }
672
673 static void job_update_rc(Job *job)
674 {
675     if (!job->ret && job_is_cancelled(job)) {
676         job->ret = -ECANCELED;
677     }
678     if (job->ret) {
679         if (!job->err) {
680             error_setg(&job->err, "%s", strerror(-job->ret));
681         }
682         job_state_transition(job, JOB_STATUS_ABORTING);
683     }
684 }
685
686 static void job_commit(Job *job)
687 {
688     assert(!job->ret);
689     GLOBAL_STATE_CODE();
690     if (job->driver->commit) {
691         job->driver->commit(job);
692     }
693 }
694
695 static void job_abort(Job *job)
696 {
697     assert(job->ret);
698     GLOBAL_STATE_CODE();
699     if (job->driver->abort) {
700         job->driver->abort(job);
701     }
702 }
703
704 static void job_clean(Job *job)
705 {
706     GLOBAL_STATE_CODE();
707     if (job->driver->clean) {
708         job->driver->clean(job);
709     }
710 }
711
712 static int job_finalize_single(Job *job)
713 {
714     assert(job_is_completed(job));
715
716     /* Ensure abort is called for late-transactional failures */
717     job_update_rc(job);
718
719     if (!job->ret) {
720         job_commit(job);
721     } else {
722         job_abort(job);
723     }
724     job_clean(job);
725
726     if (job->cb) {
727         job->cb(job->opaque, job->ret);
728     }
729
730     /* Emit events only if we actually started */
731     if (job_started(job)) {
732         if (job_is_cancelled(job)) {
733             job_event_cancelled(job);
734         } else {
735             job_event_completed(job);
736         }
737     }
738
739     job_txn_del_job(job);
740     job_conclude(job);
741     return 0;
742 }
743
744 static void job_cancel_async(Job *job, bool force)
745 {
746     GLOBAL_STATE_CODE();
747     if (job->driver->cancel) {
748         force = job->driver->cancel(job, force);
749     } else {
750         /* No .cancel() means the job will behave as if force-cancelled */
751         force = true;
752     }
753
754     if (job->user_paused) {
755         /* Do not call job_enter here, the caller will handle it.  */
756         if (job->driver->user_resume) {
757             job->driver->user_resume(job);
758         }
759         job->user_paused = false;
760         assert(job->pause_count > 0);
761         job->pause_count--;
762     }
763
764     /*
765      * Ignore soft cancel requests after the job is already done
766      * (We will still invoke job->driver->cancel() above, but if the
767      * job driver supports soft cancelling and the job is done, that
768      * should be a no-op, too.  We still call it so it can override
769      * @force.)
770      */
771     if (force || !job->deferred_to_main_loop) {
772         job->cancelled = true;
773         /* To prevent 'force == false' overriding a previous 'force == true' */
774         job->force_cancel |= force;
775     }
776 }
777
778 static void job_completed_txn_abort(Job *job)
779 {
780     AioContext *ctx;
781     JobTxn *txn = job->txn;
782     Job *other_job;
783
784     if (txn->aborting) {
785         /*
786          * We are cancelled by another job, which will handle everything.
787          */
788         return;
789     }
790     txn->aborting = true;
791     job_txn_ref(txn);
792
793     /*
794      * We can only hold the single job's AioContext lock while calling
795      * job_finalize_single() because the finalization callbacks can involve
796      * calls of AIO_WAIT_WHILE(), which could deadlock otherwise.
797      * Note that the job's AioContext may change when it is finalized.
798      */
799     job_ref(job);
800     aio_context_release(job->aio_context);
801
802     /* Other jobs are effectively cancelled by us, set the status for
803      * them; this job, however, may or may not be cancelled, depending
804      * on the caller, so leave it. */
805     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
806         if (other_job != job) {
807             ctx = other_job->aio_context;
808             aio_context_acquire(ctx);
809             /*
810              * This is a transaction: If one job failed, no result will matter.
811              * Therefore, pass force=true to terminate all other jobs as quickly
812              * as possible.
813              */
814             job_cancel_async(other_job, true);
815             aio_context_release(ctx);
816         }
817     }
818     while (!QLIST_EMPTY(&txn->jobs)) {
819         other_job = QLIST_FIRST(&txn->jobs);
820         /*
821          * The job's AioContext may change, so store it in @ctx so we
822          * release the same context that we have acquired before.
823          */
824         ctx = other_job->aio_context;
825         aio_context_acquire(ctx);
826         if (!job_is_completed(other_job)) {
827             assert(job_cancel_requested(other_job));
828             job_finish_sync(other_job, NULL, NULL);
829         }
830         job_finalize_single(other_job);
831         aio_context_release(ctx);
832     }
833
834     /*
835      * Use job_ref()/job_unref() so we can read the AioContext here
836      * even if the job went away during job_finalize_single().
837      */
838     aio_context_acquire(job->aio_context);
839     job_unref(job);
840
841     job_txn_unref(txn);
842 }
843
844 static int job_prepare(Job *job)
845 {
846     GLOBAL_STATE_CODE();
847     if (job->ret == 0 && job->driver->prepare) {
848         job->ret = job->driver->prepare(job);
849         job_update_rc(job);
850     }
851     return job->ret;
852 }
853
854 static int job_needs_finalize(Job *job)
855 {
856     return !job->auto_finalize;
857 }
858
859 static void job_do_finalize(Job *job)
860 {
861     int rc;
862     assert(job && job->txn);
863
864     /* prepare the transaction to complete */
865     rc = job_txn_apply(job, job_prepare);
866     if (rc) {
867         job_completed_txn_abort(job);
868     } else {
869         job_txn_apply(job, job_finalize_single);
870     }
871 }
872
873 void job_finalize(Job *job, Error **errp)
874 {
875     assert(job && job->id);
876     if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) {
877         return;
878     }
879     job_do_finalize(job);
880 }
881
882 static int job_transition_to_pending(Job *job)
883 {
884     job_state_transition(job, JOB_STATUS_PENDING);
885     if (!job->auto_finalize) {
886         job_event_pending(job);
887     }
888     return 0;
889 }
890
891 void job_transition_to_ready(Job *job)
892 {
893     job_state_transition(job, JOB_STATUS_READY);
894     job_event_ready(job);
895 }
896
897 static void job_completed_txn_success(Job *job)
898 {
899     JobTxn *txn = job->txn;
900     Job *other_job;
901
902     job_state_transition(job, JOB_STATUS_WAITING);
903
904     /*
905      * Successful completion, see if there are other running jobs in this
906      * txn.
907      */
908     QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
909         if (!job_is_completed(other_job)) {
910             return;
911         }
912         assert(other_job->ret == 0);
913     }
914
915     job_txn_apply(job, job_transition_to_pending);
916
917     /* If no jobs need manual finalization, automatically do so */
918     if (job_txn_apply(job, job_needs_finalize) == 0) {
919         job_do_finalize(job);
920     }
921 }
922
923 static void job_completed(Job *job)
924 {
925     assert(job && job->txn && !job_is_completed(job));
926
927     job_update_rc(job);
928     trace_job_completed(job, job->ret);
929     if (job->ret) {
930         job_completed_txn_abort(job);
931     } else {
932         job_completed_txn_success(job);
933     }
934 }
935
936 /** Useful only as a type shim for aio_bh_schedule_oneshot. */
937 static void job_exit(void *opaque)
938 {
939     Job *job = (Job *)opaque;
940     AioContext *ctx;
941
942     job_ref(job);
943     aio_context_acquire(job->aio_context);
944
945     /* This is a lie, we're not quiescent, but still doing the completion
946      * callbacks. However, completion callbacks tend to involve operations that
947      * drain block nodes, and if .drained_poll still returned true, we would
948      * deadlock. */
949     job->busy = false;
950     job_event_idle(job);
951
952     job_completed(job);
953
954     /*
955      * Note that calling job_completed can move the job to a different
956      * aio_context, so we cannot cache from above. job_txn_apply takes care of
957      * acquiring the new lock, and we ref/unref to avoid job_completed freeing
958      * the job underneath us.
959      */
960     ctx = job->aio_context;
961     job_unref(job);
962     aio_context_release(ctx);
963 }
964
965 /**
966  * All jobs must allow a pause point before entering their job proper. This
967  * ensures that jobs can be paused prior to being started, then resumed later.
968  */
969 static void coroutine_fn job_co_entry(void *opaque)
970 {
971     Job *job = opaque;
972
973     assert(job && job->driver && job->driver->run);
974     assert(job->aio_context == qemu_get_current_aio_context());
975     job_pause_point(job);
976     job->ret = job->driver->run(job, &job->err);
977     job->deferred_to_main_loop = true;
978     job->busy = true;
979     aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
980 }
981
982 void job_start(Job *job)
983 {
984     assert(job && !job_started(job) && job->paused &&
985            job->driver && job->driver->run);
986     job->co = qemu_coroutine_create(job_co_entry, job);
987     job->pause_count--;
988     job->busy = true;
989     job->paused = false;
990     job_state_transition(job, JOB_STATUS_RUNNING);
991     aio_co_enter(job->aio_context, job->co);
992 }
993
994 void job_cancel(Job *job, bool force)
995 {
996     if (job->status == JOB_STATUS_CONCLUDED) {
997         job_do_dismiss(job);
998         return;
999     }
1000     job_cancel_async(job, force);
1001     if (!job_started(job)) {
1002         job_completed(job);
1003     } else if (job->deferred_to_main_loop) {
1004         /*
1005          * job_cancel_async() ignores soft-cancel requests for jobs
1006          * that are already done (i.e. deferred to the main loop).  We
1007          * have to check again whether the job is really cancelled.
1008          * (job_cancel_requested() and job_is_cancelled() are equivalent
1009          * here, because job_cancel_async() will make soft-cancel
1010          * requests no-ops when deferred_to_main_loop is true.  We
1011          * choose to call job_is_cancelled() to show that we invoke
1012          * job_completed_txn_abort() only for force-cancelled jobs.)
1013          */
1014         if (job_is_cancelled(job)) {
1015             job_completed_txn_abort(job);
1016         }
1017     } else {
1018         job_enter(job);
1019     }
1020 }
1021
1022 void job_user_cancel(Job *job, bool force, Error **errp)
1023 {
1024     if (job_apply_verb(job, JOB_VERB_CANCEL, errp)) {
1025         return;
1026     }
1027     job_cancel(job, force);
1028 }
1029
1030 /* A wrapper around job_cancel() taking an Error ** parameter so it may be
1031  * used with job_finish_sync() without the need for (rather nasty) function
1032  * pointer casts there. */
1033 static void job_cancel_err(Job *job, Error **errp)
1034 {
1035     job_cancel(job, false);
1036 }
1037
1038 /**
1039  * Same as job_cancel_err(), but force-cancel.
1040  */
1041 static void job_force_cancel_err(Job *job, Error **errp)
1042 {
1043     job_cancel(job, true);
1044 }
1045
1046 int job_cancel_sync(Job *job, bool force)
1047 {
1048     if (force) {
1049         return job_finish_sync(job, &job_force_cancel_err, NULL);
1050     } else {
1051         return job_finish_sync(job, &job_cancel_err, NULL);
1052     }
1053 }
1054
1055 void job_cancel_sync_all(void)
1056 {
1057     Job *job;
1058     AioContext *aio_context;
1059
1060     while ((job = job_next(NULL))) {
1061         aio_context = job->aio_context;
1062         aio_context_acquire(aio_context);
1063         job_cancel_sync(job, true);
1064         aio_context_release(aio_context);
1065     }
1066 }
1067
1068 int job_complete_sync(Job *job, Error **errp)
1069 {
1070     return job_finish_sync(job, job_complete, errp);
1071 }
1072
1073 void job_complete(Job *job, Error **errp)
1074 {
1075     /* Should not be reachable via external interface for internal jobs */
1076     assert(job->id);
1077     GLOBAL_STATE_CODE();
1078     if (job_apply_verb(job, JOB_VERB_COMPLETE, errp)) {
1079         return;
1080     }
1081     if (job_cancel_requested(job) || !job->driver->complete) {
1082         error_setg(errp, "The active block job '%s' cannot be completed",
1083                    job->id);
1084         return;
1085     }
1086
1087     job->driver->complete(job, errp);
1088 }
1089
1090 int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
1091 {
1092     Error *local_err = NULL;
1093     int ret;
1094
1095     job_ref(job);
1096
1097     if (finish) {
1098         finish(job, &local_err);
1099     }
1100     if (local_err) {
1101         error_propagate(errp, local_err);
1102         job_unref(job);
1103         return -EBUSY;
1104     }
1105
1106     AIO_WAIT_WHILE(job->aio_context,
1107                    (job_enter(job), !job_is_completed(job)));
1108
1109     ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
1110     job_unref(job);
1111     return ret;
1112 }