OSDN Git Service

multifd: Add the ramblock to MultiFDRecvParams
[qmiga/qemu.git] / migration / multifd.c
index 7aa030f..5c4298e 100644 (file)
 #include "qapi/error.h"
 #include "ram.h"
 #include "migration.h"
+#include "migration-stats.h"
 #include "socket.h"
 #include "tls.h"
 #include "qemu-file.h"
 #include "trace.h"
 #include "multifd.h"
 #include "threadinfo.h"
-
+#include "options.h"
 #include "qemu/yank.h"
 #include "io/channel-socket.h"
 #include "yank_functions.h"
@@ -280,7 +281,6 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
 static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 {
     MultiFDPacket_t *packet = p->packet;
-    RAMBlock *block;
     int i;
 
     packet->magic = be32_to_cpu(packet->magic);
@@ -330,21 +330,21 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
 
     /* make sure that ramblock is 0 terminated */
     packet->ramblock[255] = 0;
-    block = qemu_ram_block_by_name(packet->ramblock);
-    if (!block) {
+    p->block = qemu_ram_block_by_name(packet->ramblock);
+    if (!p->block) {
         error_setg(errp, "multifd: unknown ram block %s",
                    packet->ramblock);
         return -1;
     }
 
-    p->host = block->host;
+    p->host = p->block->host;
     for (i = 0; i < p->normal_num; i++) {
         uint64_t offset = be64_to_cpu(packet->offset[i]);
 
-        if (offset > (block->used_length - p->page_size)) {
+        if (offset > (p->block->used_length - p->page_size)) {
             error_setg(errp, "multifd: offset too long %" PRIu64
                        " (max " RAM_ADDR_FMT ")",
-                       offset, block->used_length);
+                       offset, p->block->used_length);
             return -1;
         }
         p->normal[i] = offset;
@@ -432,9 +432,9 @@ static int multifd_send_pages(QEMUFile *f)
     p->pages = pages;
     transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len;
     qemu_file_acct_rate_limit(f, transferred);
-    ram_counters.multifd_bytes += transferred;
-    stat64_add(&ram_atomic_counters.transferred, transferred);
     qemu_mutex_unlock(&p->mutex);
+    stat64_add(&mig_stats.transferred, transferred);
+    stat64_add(&mig_stats.multifd_bytes, transferred);
     qemu_sem_post(&p->sem);
 
     return 1;
@@ -516,7 +516,7 @@ void multifd_save_cleanup(void)
 {
     int i;
 
-    if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) {
+    if (!migrate_multifd()) {
         return;
     }
     multifd_send_terminate_threads(NULL);
@@ -576,7 +576,7 @@ static int multifd_zero_copy_flush(QIOChannel *c)
         return -1;
     }
     if (ret == 1) {
-        dirty_sync_missed_zero_copy();
+        stat64_add(&mig_stats.dirty_sync_missed_zero_copy, 1);
     }
 
     return ret;
@@ -587,7 +587,7 @@ int multifd_send_sync_main(QEMUFile *f)
     int i;
     bool flush_zero_copy;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return 0;
     }
     if (multifd_send_state->pages->num) {
@@ -608,7 +608,7 @@ int multifd_send_sync_main(QEMUFile *f)
      * all the dirty bitmaps.
      */
 
-    flush_zero_copy = migrate_use_zero_copy_send();
+    flush_zero_copy = migrate_zero_copy_send();
 
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
@@ -626,15 +626,13 @@ int multifd_send_sync_main(QEMUFile *f)
         p->packet_num = multifd_send_state->packet_num++;
         p->flags |= MULTIFD_FLAG_SYNC;
         p->pending_job++;
-        qemu_file_acct_rate_limit(f, p->packet_len);
-        ram_counters.multifd_bytes += p->packet_len;
-        stat64_add(&ram_atomic_counters.transferred, p->packet_len);
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_post(&p->sem);
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
+        qemu_sem_wait(&multifd_send_state->channels_ready);
         trace_multifd_send_sync_main_wait(p->id);
         qemu_sem_wait(&p->sem_sync);
 
@@ -653,7 +651,7 @@ static void *multifd_send_thread(void *opaque)
     MigrationThread *thread = NULL;
     Error *local_err = NULL;
     int ret = 0;
-    bool use_zero_copy_send = migrate_use_zero_copy_send();
+    bool use_zero_copy_send = migrate_zero_copy_send();
 
     thread = MigrationThreadAdd(p->name, qemu_get_thread_id());
 
@@ -668,6 +666,7 @@ static void *multifd_send_thread(void *opaque)
     p->num_packets = 1;
 
     while (true) {
+        qemu_sem_post(&multifd_send_state->channels_ready);
         qemu_sem_wait(&p->sem);
 
         if (qatomic_read(&multifd_send_state->exiting)) {
@@ -677,7 +676,7 @@ static void *multifd_send_thread(void *opaque)
 
         if (p->pending_job) {
             uint64_t packet_num = p->packet_num;
-            uint32_t flags = p->flags;
+            uint32_t flags;
             p->normal_num = 0;
 
             if (use_zero_copy_send) {
@@ -699,6 +698,7 @@ static void *multifd_send_thread(void *opaque)
                 }
             }
             multifd_send_fill_packet(p);
+            flags = p->flags;
             p->flags = 0;
             p->num_packets++;
             p->total_normal_pages += p->normal_num;
@@ -735,7 +735,6 @@ static void *multifd_send_thread(void *opaque)
             if (flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&p->sem_sync);
             }
-            qemu_sem_post(&multifd_send_state->channels_ready);
         } else if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
@@ -821,7 +820,7 @@ static void multifd_tls_channel_connect(MultiFDSendParams *p,
     const char *hostname = s->hostname;
     QIOChannelTLS *tioc;
 
-    tioc = migration_tls_client_create(s, ioc, hostname, errp);
+    tioc = migration_tls_client_create(ioc, hostname, errp);
     if (!tioc) {
         return;
     }
@@ -910,13 +909,9 @@ int multifd_save_setup(Error **errp)
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     uint8_t i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return 0;
     }
-    if (!migrate_multi_channels_is_allowed()) {
-        error_setg(errp, "multifd is not supported by current protocol");
-        return -1;
-    }
 
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
@@ -948,7 +943,7 @@ int multifd_save_setup(Error **errp)
         p->page_size = qemu_target_page_size();
         p->page_count = page_count;
 
-        if (migrate_use_zero_copy_send()) {
+        if (migrate_zero_copy_send()) {
             p->write_flags = QIO_CHANNEL_WRITE_FLAG_ZERO_COPY;
         } else {
             p->write_flags = 0;
@@ -1017,26 +1012,33 @@ static void multifd_recv_terminate_threads(Error *err)
     }
 }
 
-int multifd_load_cleanup(Error **errp)
+void multifd_load_shutdown(void)
+{
+    if (migrate_multifd()) {
+        multifd_recv_terminate_threads(NULL);
+    }
+}
+
+void multifd_load_cleanup(void)
 {
     int i;
 
-    if (!migrate_use_multifd() || !migrate_multi_channels_is_allowed()) {
-        return 0;
+    if (!migrate_multifd()) {
+        return;
     }
     multifd_recv_terminate_threads(NULL);
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         if (p->running) {
-            p->quit = true;
             /*
              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
              * however try to wakeup it without harm in cleanup phase.
              */
             qemu_sem_post(&p->sem_sync);
-            qemu_thread_join(&p->thread);
         }
+
+        qemu_thread_join(&p->thread);
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
@@ -1062,15 +1064,13 @@ int multifd_load_cleanup(Error **errp)
     multifd_recv_state->params = NULL;
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
-
-    return 0;
 }
 
 void multifd_recv_sync_main(void)
 {
     int i;
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return;
     }
     for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -1168,14 +1168,10 @@ int multifd_load_setup(Error **errp)
      * Return successfully if multiFD recv state is already initialised
      * or multiFD is not enabled.
      */
-    if (multifd_recv_state || !migrate_use_multifd()) {
+    if (multifd_recv_state || !migrate_multifd()) {
         return 0;
     }
 
-    if (!migrate_multi_channels_is_allowed()) {
-        error_setg(errp, "multifd is not supported by current protocol");
-        return -1;
-    }
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
@@ -1218,7 +1214,7 @@ bool multifd_recv_all_channels_created(void)
 {
     int thread_count = migrate_multifd_channels();
 
-    if (!migrate_use_multifd()) {
+    if (!migrate_multifd()) {
         return true;
     }