OSDN Git Service

blockjob: implement .change_aio_ctx in child_job
[qmiga/qemu.git] / blockjob.c
1 /*
2  * QEMU System Emulator block driver
3  *
4  * Copyright (c) 2011 IBM Corp.
5  * Copyright (c) 2012 Red Hat, Inc.
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a copy
8  * of this software and associated documentation files (the "Software"), to deal
9  * in the Software without restriction, including without limitation the rights
10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11  * copies of the Software, and to permit persons to whom the Software is
12  * furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23  * THE SOFTWARE.
24  */
25
26 #include "qemu/osdep.h"
27 #include "block/block.h"
28 #include "block/blockjob_int.h"
29 #include "block/block_int.h"
30 #include "block/trace.h"
31 #include "sysemu/block-backend.h"
32 #include "qapi/error.h"
33 #include "qapi/qapi-events-block-core.h"
34 #include "qapi/qmp/qerror.h"
35 #include "qemu/coroutine.h"
36 #include "qemu/main-loop.h"
37 #include "qemu/timer.h"
38
39 static bool is_block_job(Job *job)
40 {
41     return job_type(job) == JOB_TYPE_BACKUP ||
42            job_type(job) == JOB_TYPE_COMMIT ||
43            job_type(job) == JOB_TYPE_MIRROR ||
44            job_type(job) == JOB_TYPE_STREAM;
45 }
46
47 BlockJob *block_job_next_locked(BlockJob *bjob)
48 {
49     Job *job = bjob ? &bjob->job : NULL;
50     GLOBAL_STATE_CODE();
51
52     do {
53         job = job_next_locked(job);
54     } while (job && !is_block_job(job));
55
56     return job ? container_of(job, BlockJob, job) : NULL;
57 }
58
59 BlockJob *block_job_get_locked(const char *id)
60 {
61     Job *job = job_get_locked(id);
62     GLOBAL_STATE_CODE();
63
64     if (job && is_block_job(job)) {
65         return container_of(job, BlockJob, job);
66     } else {
67         return NULL;
68     }
69 }
70
71 BlockJob *block_job_get(const char *id)
72 {
73     JOB_LOCK_GUARD();
74     return block_job_get_locked(id);
75 }
76
77 void block_job_free(Job *job)
78 {
79     BlockJob *bjob = container_of(job, BlockJob, job);
80     GLOBAL_STATE_CODE();
81
82     block_job_remove_all_bdrv(bjob);
83     ratelimit_destroy(&bjob->limit);
84     error_free(bjob->blocker);
85 }
86
87 static char *child_job_get_parent_desc(BdrvChild *c)
88 {
89     BlockJob *job = c->opaque;
90     return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id);
91 }
92
93 static void child_job_drained_begin(BdrvChild *c)
94 {
95     BlockJob *job = c->opaque;
96     job_pause(&job->job);
97 }
98
99 static bool child_job_drained_poll(BdrvChild *c)
100 {
101     BlockJob *bjob = c->opaque;
102     Job *job = &bjob->job;
103     const BlockJobDriver *drv = block_job_driver(bjob);
104
105     /* An inactive or completed job doesn't have any pending requests. Jobs
106      * with !job->busy are either already paused or have a pause point after
107      * being reentered, so no job driver code will run before they pause. */
108     WITH_JOB_LOCK_GUARD() {
109         if (!job->busy || job_is_completed_locked(job)) {
110             return false;
111         }
112     }
113
114     /* Otherwise, assume that it isn't fully stopped yet, but allow the job to
115      * override this assumption. */
116     if (drv->drained_poll) {
117         return drv->drained_poll(bjob);
118     } else {
119         return true;
120     }
121 }
122
123 static void child_job_drained_end(BdrvChild *c, int *drained_end_counter)
124 {
125     BlockJob *job = c->opaque;
126     job_resume(&job->job);
127 }
128
129 typedef struct BdrvStateChildJobContext {
130     AioContext *new_ctx;
131     BlockJob *job;
132 } BdrvStateChildJobContext;
133
134 static void child_job_set_aio_ctx_commit(void *opaque)
135 {
136     BdrvStateChildJobContext *s = opaque;
137     BlockJob *job = s->job;
138
139     job_set_aio_context(&job->job, s->new_ctx);
140 }
141
142 static TransactionActionDrv change_child_job_context = {
143     .commit = child_job_set_aio_ctx_commit,
144     .clean = g_free,
145 };
146
147 static bool child_job_change_aio_ctx(BdrvChild *c, AioContext *ctx,
148                                      GHashTable *visited, Transaction *tran,
149                                      Error **errp)
150 {
151     BlockJob *job = c->opaque;
152     BdrvStateChildJobContext *s;
153     GSList *l;
154
155     for (l = job->nodes; l; l = l->next) {
156         BdrvChild *sibling = l->data;
157         if (!bdrv_child_change_aio_context(sibling, ctx, visited,
158                                            tran, errp)) {
159             return false;
160         }
161     }
162
163     s = g_new(BdrvStateChildJobContext, 1);
164     *s = (BdrvStateChildJobContext) {
165         .new_ctx = ctx,
166         .job = job,
167     };
168
169     tran_add(tran, &change_child_job_context, s);
170     return true;
171 }
172
173 static bool child_job_can_set_aio_ctx(BdrvChild *c, AioContext *ctx,
174                                       GSList **ignore, Error **errp)
175 {
176     BlockJob *job = c->opaque;
177     GSList *l;
178
179     for (l = job->nodes; l; l = l->next) {
180         BdrvChild *sibling = l->data;
181         if (!bdrv_child_can_set_aio_context(sibling, ctx, ignore, errp)) {
182             return false;
183         }
184     }
185     return true;
186 }
187
188 static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
189                                   GSList **ignore)
190 {
191     BlockJob *job = c->opaque;
192     GSList *l;
193
194     for (l = job->nodes; l; l = l->next) {
195         BdrvChild *sibling = l->data;
196         if (g_slist_find(*ignore, sibling)) {
197             continue;
198         }
199         *ignore = g_slist_prepend(*ignore, sibling);
200         bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
201     }
202
203     job_set_aio_context(&job->job, ctx);
204 }
205
206 static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
207 {
208     BlockJob *job = c->opaque;
209     GLOBAL_STATE_CODE();
210
211     return job->job.aio_context;
212 }
213
214 static const BdrvChildClass child_job = {
215     .get_parent_desc    = child_job_get_parent_desc,
216     .drained_begin      = child_job_drained_begin,
217     .drained_poll       = child_job_drained_poll,
218     .drained_end        = child_job_drained_end,
219     .can_set_aio_ctx    = child_job_can_set_aio_ctx,
220     .set_aio_ctx        = child_job_set_aio_ctx,
221     .change_aio_ctx     = child_job_change_aio_ctx,
222     .stay_at_node       = true,
223     .get_parent_aio_context = child_job_get_parent_aio_context,
224 };
225
226 void block_job_remove_all_bdrv(BlockJob *job)
227 {
228     GLOBAL_STATE_CODE();
229     /*
230      * bdrv_root_unref_child() may reach child_job_[can_]set_aio_ctx(),
231      * which will also traverse job->nodes, so consume the list one by
232      * one to make sure that such a concurrent access does not attempt
233      * to process an already freed BdrvChild.
234      */
235     while (job->nodes) {
236         GSList *l = job->nodes;
237         BdrvChild *c = l->data;
238
239         job->nodes = l->next;
240
241         bdrv_op_unblock_all(c->bs, job->blocker);
242         bdrv_root_unref_child(c);
243
244         g_slist_free_1(l);
245     }
246 }
247
248 bool block_job_has_bdrv(BlockJob *job, BlockDriverState *bs)
249 {
250     GSList *el;
251     GLOBAL_STATE_CODE();
252
253     for (el = job->nodes; el; el = el->next) {
254         BdrvChild *c = el->data;
255         if (c->bs == bs) {
256             return true;
257         }
258     }
259
260     return false;
261 }
262
263 int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
264                        uint64_t perm, uint64_t shared_perm, Error **errp)
265 {
266     BdrvChild *c;
267     bool need_context_ops;
268     GLOBAL_STATE_CODE();
269
270     bdrv_ref(bs);
271
272     need_context_ops = bdrv_get_aio_context(bs) != job->job.aio_context;
273
274     if (need_context_ops && job->job.aio_context != qemu_get_aio_context()) {
275         aio_context_release(job->job.aio_context);
276     }
277     c = bdrv_root_attach_child(bs, name, &child_job, 0, perm, shared_perm, job,
278                                errp);
279     if (need_context_ops && job->job.aio_context != qemu_get_aio_context()) {
280         aio_context_acquire(job->job.aio_context);
281     }
282     if (c == NULL) {
283         return -EPERM;
284     }
285
286     job->nodes = g_slist_prepend(job->nodes, c);
287     bdrv_op_block_all(bs, job->blocker);
288
289     return 0;
290 }
291
292 /* Called with job_mutex lock held. */
293 static void block_job_on_idle_locked(Notifier *n, void *opaque)
294 {
295     aio_wait_kick();
296 }
297
298 bool block_job_is_internal(BlockJob *job)
299 {
300     return (job->job.id == NULL);
301 }
302
303 const BlockJobDriver *block_job_driver(BlockJob *job)
304 {
305     return container_of(job->job.driver, BlockJobDriver, job_driver);
306 }
307
308 /* Assumes the job_mutex is held */
309 static bool job_timer_pending(Job *job)
310 {
311     return timer_pending(&job->sleep_timer);
312 }
313
314 bool block_job_set_speed_locked(BlockJob *job, int64_t speed, Error **errp)
315 {
316     const BlockJobDriver *drv = block_job_driver(job);
317     int64_t old_speed = job->speed;
318
319     GLOBAL_STATE_CODE();
320
321     if (job_apply_verb_locked(&job->job, JOB_VERB_SET_SPEED, errp) < 0) {
322         return false;
323     }
324     if (speed < 0) {
325         error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "speed",
326                    "a non-negative value");
327         return false;
328     }
329
330     ratelimit_set_speed(&job->limit, speed, BLOCK_JOB_SLICE_TIME);
331
332     job->speed = speed;
333
334     if (drv->set_speed) {
335         job_unlock();
336         drv->set_speed(job, speed);
337         job_lock();
338     }
339
340     if (speed && speed <= old_speed) {
341         return true;
342     }
343
344     /* kick only if a timer is pending */
345     job_enter_cond_locked(&job->job, job_timer_pending);
346
347     return true;
348 }
349
350 static bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
351 {
352     JOB_LOCK_GUARD();
353     return block_job_set_speed_locked(job, speed, errp);
354 }
355
356 int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
357 {
358     IO_CODE();
359     return ratelimit_calculate_delay(&job->limit, n);
360 }
361
362 BlockJobInfo *block_job_query_locked(BlockJob *job, Error **errp)
363 {
364     BlockJobInfo *info;
365     uint64_t progress_current, progress_total;
366
367     GLOBAL_STATE_CODE();
368
369     if (block_job_is_internal(job)) {
370         error_setg(errp, "Cannot query QEMU internal jobs");
371         return NULL;
372     }
373
374     progress_get_snapshot(&job->job.progress, &progress_current,
375                           &progress_total);
376
377     info = g_new0(BlockJobInfo, 1);
378     info->type      = g_strdup(job_type_str(&job->job));
379     info->device    = g_strdup(job->job.id);
380     info->busy      = job->job.busy;
381     info->paused    = job->job.pause_count > 0;
382     info->offset    = progress_current;
383     info->len       = progress_total;
384     info->speed     = job->speed;
385     info->io_status = job->iostatus;
386     info->ready     = job_is_ready_locked(&job->job),
387     info->status    = job->job.status;
388     info->auto_finalize = job->job.auto_finalize;
389     info->auto_dismiss  = job->job.auto_dismiss;
390     if (job->job.ret) {
391         info->has_error = true;
392         info->error = job->job.err ?
393                         g_strdup(error_get_pretty(job->job.err)) :
394                         g_strdup(strerror(-job->job.ret));
395     }
396     return info;
397 }
398
399 /* Called with job lock held */
400 static void block_job_iostatus_set_err_locked(BlockJob *job, int error)
401 {
402     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
403         job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
404                                           BLOCK_DEVICE_IO_STATUS_FAILED;
405     }
406 }
407
408 /* Called with job_mutex lock held. */
409 static void block_job_event_cancelled_locked(Notifier *n, void *opaque)
410 {
411     BlockJob *job = opaque;
412     uint64_t progress_current, progress_total;
413
414     if (block_job_is_internal(job)) {
415         return;
416     }
417
418     progress_get_snapshot(&job->job.progress, &progress_current,
419                           &progress_total);
420
421     qapi_event_send_block_job_cancelled(job_type(&job->job),
422                                         job->job.id,
423                                         progress_total,
424                                         progress_current,
425                                         job->speed);
426 }
427
428 /* Called with job_mutex lock held. */
429 static void block_job_event_completed_locked(Notifier *n, void *opaque)
430 {
431     BlockJob *job = opaque;
432     const char *msg = NULL;
433     uint64_t progress_current, progress_total;
434
435     if (block_job_is_internal(job)) {
436         return;
437     }
438
439     if (job->job.ret < 0) {
440         msg = error_get_pretty(job->job.err);
441     }
442
443     progress_get_snapshot(&job->job.progress, &progress_current,
444                           &progress_total);
445
446     qapi_event_send_block_job_completed(job_type(&job->job),
447                                         job->job.id,
448                                         progress_total,
449                                         progress_current,
450                                         job->speed,
451                                         !!msg,
452                                         msg);
453 }
454
455 /* Called with job_mutex lock held. */
456 static void block_job_event_pending_locked(Notifier *n, void *opaque)
457 {
458     BlockJob *job = opaque;
459
460     if (block_job_is_internal(job)) {
461         return;
462     }
463
464     qapi_event_send_block_job_pending(job_type(&job->job),
465                                       job->job.id);
466 }
467
468 /* Called with job_mutex lock held. */
469 static void block_job_event_ready_locked(Notifier *n, void *opaque)
470 {
471     BlockJob *job = opaque;
472     uint64_t progress_current, progress_total;
473
474     if (block_job_is_internal(job)) {
475         return;
476     }
477
478     progress_get_snapshot(&job->job.progress, &progress_current,
479                           &progress_total);
480
481     qapi_event_send_block_job_ready(job_type(&job->job),
482                                     job->job.id,
483                                     progress_total,
484                                     progress_current,
485                                     job->speed);
486 }
487
488
489 void *block_job_create(const char *job_id, const BlockJobDriver *driver,
490                        JobTxn *txn, BlockDriverState *bs, uint64_t perm,
491                        uint64_t shared_perm, int64_t speed, int flags,
492                        BlockCompletionFunc *cb, void *opaque, Error **errp)
493 {
494     BlockJob *job;
495     int ret;
496     GLOBAL_STATE_CODE();
497
498     if (job_id == NULL && !(flags & JOB_INTERNAL)) {
499         job_id = bdrv_get_device_name(bs);
500     }
501
502     job = job_create(job_id, &driver->job_driver, txn, bdrv_get_aio_context(bs),
503                      flags, cb, opaque, errp);
504     if (job == NULL) {
505         return NULL;
506     }
507
508     assert(is_block_job(&job->job));
509     assert(job->job.driver->free == &block_job_free);
510     assert(job->job.driver->user_resume == &block_job_user_resume);
511
512     ratelimit_init(&job->limit);
513
514     job->finalize_cancelled_notifier.notify = block_job_event_cancelled_locked;
515     job->finalize_completed_notifier.notify = block_job_event_completed_locked;
516     job->pending_notifier.notify = block_job_event_pending_locked;
517     job->ready_notifier.notify = block_job_event_ready_locked;
518     job->idle_notifier.notify = block_job_on_idle_locked;
519
520     WITH_JOB_LOCK_GUARD() {
521         notifier_list_add(&job->job.on_finalize_cancelled,
522                           &job->finalize_cancelled_notifier);
523         notifier_list_add(&job->job.on_finalize_completed,
524                           &job->finalize_completed_notifier);
525         notifier_list_add(&job->job.on_pending, &job->pending_notifier);
526         notifier_list_add(&job->job.on_ready, &job->ready_notifier);
527         notifier_list_add(&job->job.on_idle, &job->idle_notifier);
528     }
529
530     error_setg(&job->blocker, "block device is in use by block job: %s",
531                job_type_str(&job->job));
532
533     ret = block_job_add_bdrv(job, "main node", bs, perm, shared_perm, errp);
534     if (ret < 0) {
535         goto fail;
536     }
537
538     bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
539
540     if (!block_job_set_speed(job, speed, errp)) {
541         goto fail;
542     }
543
544     return job;
545
546 fail:
547     job_early_fail(&job->job);
548     return NULL;
549 }
550
551 void block_job_iostatus_reset_locked(BlockJob *job)
552 {
553     GLOBAL_STATE_CODE();
554     if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
555         return;
556     }
557     assert(job->job.user_paused && job->job.pause_count > 0);
558     job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
559 }
560
561 static void block_job_iostatus_reset(BlockJob *job)
562 {
563     JOB_LOCK_GUARD();
564     block_job_iostatus_reset_locked(job);
565 }
566
567 void block_job_user_resume(Job *job)
568 {
569     BlockJob *bjob = container_of(job, BlockJob, job);
570     GLOBAL_STATE_CODE();
571     block_job_iostatus_reset(bjob);
572 }
573
574 BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
575                                         int is_read, int error)
576 {
577     BlockErrorAction action;
578     IO_CODE();
579
580     switch (on_err) {
581     case BLOCKDEV_ON_ERROR_ENOSPC:
582     case BLOCKDEV_ON_ERROR_AUTO:
583         action = (error == ENOSPC) ?
584                  BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
585         break;
586     case BLOCKDEV_ON_ERROR_STOP:
587         action = BLOCK_ERROR_ACTION_STOP;
588         break;
589     case BLOCKDEV_ON_ERROR_REPORT:
590         action = BLOCK_ERROR_ACTION_REPORT;
591         break;
592     case BLOCKDEV_ON_ERROR_IGNORE:
593         action = BLOCK_ERROR_ACTION_IGNORE;
594         break;
595     default:
596         abort();
597     }
598     if (!block_job_is_internal(job)) {
599         qapi_event_send_block_job_error(job->job.id,
600                                         is_read ? IO_OPERATION_TYPE_READ :
601                                         IO_OPERATION_TYPE_WRITE,
602                                         action);
603     }
604     if (action == BLOCK_ERROR_ACTION_STOP) {
605         WITH_JOB_LOCK_GUARD() {
606             if (!job->job.user_paused) {
607                 job_pause_locked(&job->job);
608                 /*
609                  * make the pause user visible, which will be
610                  * resumed from QMP.
611                  */
612                 job->job.user_paused = true;
613             }
614             block_job_iostatus_set_err_locked(job, error);
615         }
616     }
617     return action;
618 }
619
620 AioContext *block_job_get_aio_context(BlockJob *job)
621 {
622     GLOBAL_STATE_CODE();
623     return job->job.aio_context;
624 }