OSDN Git Service

md: recover_bitmaps() can be static
[uclinux-h8/linux.git] / drivers / md / md-cluster.c
1 /*
2  * Copyright (C) 2015, SUSE
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  */
10
11
12 #include <linux/module.h>
13 #include <linux/dlm.h>
14 #include <linux/sched.h>
15 #include <linux/raid/md_p.h>
16 #include "md.h"
17 #include "bitmap.h"
18 #include "md-cluster.h"
19
20 #define LVB_SIZE        64
21 #define NEW_DEV_TIMEOUT 5000
22
23 struct dlm_lock_resource {
24         dlm_lockspace_t *ls;
25         struct dlm_lksb lksb;
26         char *name; /* lock name. */
27         uint32_t flags; /* flags to pass to dlm_lock() */
28         struct completion completion; /* completion for synchronized locking */
29         void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
30         struct mddev *mddev; /* pointing back to mddev. */
31 };
32
33 struct suspend_info {
34         int slot;
35         sector_t lo;
36         sector_t hi;
37         struct list_head list;
38 };
39
40 struct resync_info {
41         __le64 lo;
42         __le64 hi;
43 };
44
45 /* md_cluster_info flags */
46 #define         MD_CLUSTER_WAITING_FOR_NEWDISK          1
47
48
49 struct md_cluster_info {
50         /* dlm lock space and resources for clustered raid. */
51         dlm_lockspace_t *lockspace;
52         int slot_number;
53         struct completion completion;
54         struct dlm_lock_resource *sb_lock;
55         struct mutex sb_mutex;
56         struct dlm_lock_resource *bitmap_lockres;
57         struct list_head suspend_list;
58         spinlock_t suspend_lock;
59         struct md_thread *recovery_thread;
60         unsigned long recovery_map;
61         /* communication loc resources */
62         struct dlm_lock_resource *ack_lockres;
63         struct dlm_lock_resource *message_lockres;
64         struct dlm_lock_resource *token_lockres;
65         struct dlm_lock_resource *no_new_dev_lockres;
66         struct md_thread *recv_thread;
67         struct completion newdisk_completion;
68         unsigned long state;
69 };
70
71 enum msg_type {
72         METADATA_UPDATED = 0,
73         RESYNCING,
74         NEWDISK,
75 };
76
77 struct cluster_msg {
78         int type;
79         int slot;
80         /* TODO: Unionize this for smaller footprint */
81         sector_t low;
82         sector_t high;
83         char uuid[16];
84         int raid_slot;
85 };
86
87 static void sync_ast(void *arg)
88 {
89         struct dlm_lock_resource *res;
90
91         res = (struct dlm_lock_resource *) arg;
92         complete(&res->completion);
93 }
94
95 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
96 {
97         int ret = 0;
98
99         init_completion(&res->completion);
100         ret = dlm_lock(res->ls, mode, &res->lksb,
101                         res->flags, res->name, strlen(res->name),
102                         0, sync_ast, res, res->bast);
103         if (ret)
104                 return ret;
105         wait_for_completion(&res->completion);
106         return res->lksb.sb_status;
107 }
108
109 static int dlm_unlock_sync(struct dlm_lock_resource *res)
110 {
111         return dlm_lock_sync(res, DLM_LOCK_NL);
112 }
113
114 static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
115                 char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
116 {
117         struct dlm_lock_resource *res = NULL;
118         int ret, namelen;
119         struct md_cluster_info *cinfo = mddev->cluster_info;
120
121         res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
122         if (!res)
123                 return NULL;
124         res->ls = cinfo->lockspace;
125         res->mddev = mddev;
126         namelen = strlen(name);
127         res->name = kzalloc(namelen + 1, GFP_KERNEL);
128         if (!res->name) {
129                 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
130                 goto out_err;
131         }
132         strlcpy(res->name, name, namelen + 1);
133         if (with_lvb) {
134                 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
135                 if (!res->lksb.sb_lvbptr) {
136                         pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
137                         goto out_err;
138                 }
139                 res->flags = DLM_LKF_VALBLK;
140         }
141
142         if (bastfn)
143                 res->bast = bastfn;
144
145         res->flags |= DLM_LKF_EXPEDITE;
146
147         ret = dlm_lock_sync(res, DLM_LOCK_NL);
148         if (ret) {
149                 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
150                 goto out_err;
151         }
152         res->flags &= ~DLM_LKF_EXPEDITE;
153         res->flags |= DLM_LKF_CONVERT;
154
155         return res;
156 out_err:
157         kfree(res->lksb.sb_lvbptr);
158         kfree(res->name);
159         kfree(res);
160         return NULL;
161 }
162
163 static void lockres_free(struct dlm_lock_resource *res)
164 {
165         if (!res)
166                 return;
167
168         init_completion(&res->completion);
169         dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res);
170         wait_for_completion(&res->completion);
171
172         kfree(res->name);
173         kfree(res->lksb.sb_lvbptr);
174         kfree(res);
175 }
176
177 static char *pretty_uuid(char *dest, char *src)
178 {
179         int i, len = 0;
180
181         for (i = 0; i < 16; i++) {
182                 if (i == 4 || i == 6 || i == 8 || i == 10)
183                         len += sprintf(dest + len, "-");
184                 len += sprintf(dest + len, "%02x", (__u8)src[i]);
185         }
186         return dest;
187 }
188
189 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres,
190                 sector_t lo, sector_t hi)
191 {
192         struct resync_info *ri;
193
194         ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
195         ri->lo = cpu_to_le64(lo);
196         ri->hi = cpu_to_le64(hi);
197 }
198
199 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
200 {
201         struct resync_info ri;
202         struct suspend_info *s = NULL;
203         sector_t hi = 0;
204
205         dlm_lock_sync(lockres, DLM_LOCK_CR);
206         memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
207         hi = le64_to_cpu(ri.hi);
208         if (ri.hi > 0) {
209                 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
210                 if (!s)
211                         goto out;
212                 s->hi = hi;
213                 s->lo = le64_to_cpu(ri.lo);
214         }
215         dlm_unlock_sync(lockres);
216 out:
217         return s;
218 }
219
220 static void recover_bitmaps(struct md_thread *thread)
221 {
222         struct mddev *mddev = thread->mddev;
223         struct md_cluster_info *cinfo = mddev->cluster_info;
224         struct dlm_lock_resource *bm_lockres;
225         char str[64];
226         int slot, ret;
227         struct suspend_info *s, *tmp;
228         sector_t lo, hi;
229
230         while (cinfo->recovery_map) {
231                 slot = fls64((u64)cinfo->recovery_map) - 1;
232
233                 /* Clear suspend_area associated with the bitmap */
234                 spin_lock_irq(&cinfo->suspend_lock);
235                 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
236                         if (slot == s->slot) {
237                                 list_del(&s->list);
238                                 kfree(s);
239                         }
240                 spin_unlock_irq(&cinfo->suspend_lock);
241
242                 snprintf(str, 64, "bitmap%04d", slot);
243                 bm_lockres = lockres_init(mddev, str, NULL, 1);
244                 if (!bm_lockres) {
245                         pr_err("md-cluster: Cannot initialize bitmaps\n");
246                         goto clear_bit;
247                 }
248
249                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
250                 if (ret) {
251                         pr_err("md-cluster: Could not DLM lock %s: %d\n",
252                                         str, ret);
253                         goto clear_bit;
254                 }
255                 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi);
256                 if (ret) {
257                         pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
258                         goto dlm_unlock;
259                 }
260                 if (hi > 0) {
261                         /* TODO:Wait for current resync to get over */
262                         set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
263                         if (lo < mddev->recovery_cp)
264                                 mddev->recovery_cp = lo;
265                         md_check_recovery(mddev);
266                 }
267 dlm_unlock:
268                 dlm_unlock_sync(bm_lockres);
269 clear_bit:
270                 clear_bit(slot, &cinfo->recovery_map);
271         }
272 }
273
274 static void recover_prep(void *arg)
275 {
276 }
277
278 static void recover_slot(void *arg, struct dlm_slot *slot)
279 {
280         struct mddev *mddev = arg;
281         struct md_cluster_info *cinfo = mddev->cluster_info;
282
283         pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
284                         mddev->bitmap_info.cluster_name,
285                         slot->nodeid, slot->slot,
286                         cinfo->slot_number);
287         set_bit(slot->slot - 1, &cinfo->recovery_map);
288         if (!cinfo->recovery_thread) {
289                 cinfo->recovery_thread = md_register_thread(recover_bitmaps,
290                                 mddev, "recover");
291                 if (!cinfo->recovery_thread) {
292                         pr_warn("md-cluster: Could not create recovery thread\n");
293                         return;
294                 }
295         }
296         md_wakeup_thread(cinfo->recovery_thread);
297 }
298
299 static void recover_done(void *arg, struct dlm_slot *slots,
300                 int num_slots, int our_slot,
301                 uint32_t generation)
302 {
303         struct mddev *mddev = arg;
304         struct md_cluster_info *cinfo = mddev->cluster_info;
305
306         cinfo->slot_number = our_slot;
307         complete(&cinfo->completion);
308 }
309
310 static const struct dlm_lockspace_ops md_ls_ops = {
311         .recover_prep = recover_prep,
312         .recover_slot = recover_slot,
313         .recover_done = recover_done,
314 };
315
316 /*
317  * The BAST function for the ack lock resource
318  * This function wakes up the receive thread in
319  * order to receive and process the message.
320  */
321 static void ack_bast(void *arg, int mode)
322 {
323         struct dlm_lock_resource *res = (struct dlm_lock_resource *)arg;
324         struct md_cluster_info *cinfo = res->mddev->cluster_info;
325
326         if (mode == DLM_LOCK_EX)
327                 md_wakeup_thread(cinfo->recv_thread);
328 }
329
330 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
331 {
332         struct suspend_info *s, *tmp;
333
334         list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
335                 if (slot == s->slot) {
336                         pr_info("%s:%d Deleting suspend_info: %d\n",
337                                         __func__, __LINE__, slot);
338                         list_del(&s->list);
339                         kfree(s);
340                         break;
341                 }
342 }
343
344 static void remove_suspend_info(struct md_cluster_info *cinfo, int slot)
345 {
346         spin_lock_irq(&cinfo->suspend_lock);
347         __remove_suspend_info(cinfo, slot);
348         spin_unlock_irq(&cinfo->suspend_lock);
349 }
350
351
352 static void process_suspend_info(struct md_cluster_info *cinfo,
353                 int slot, sector_t lo, sector_t hi)
354 {
355         struct suspend_info *s;
356
357         if (!hi) {
358                 remove_suspend_info(cinfo, slot);
359                 return;
360         }
361         s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
362         if (!s)
363                 return;
364         s->slot = slot;
365         s->lo = lo;
366         s->hi = hi;
367         spin_lock_irq(&cinfo->suspend_lock);
368         /* Remove existing entry (if exists) before adding */
369         __remove_suspend_info(cinfo, slot);
370         list_add(&s->list, &cinfo->suspend_list);
371         spin_unlock_irq(&cinfo->suspend_lock);
372 }
373
374 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
375 {
376         char disk_uuid[64];
377         struct md_cluster_info *cinfo = mddev->cluster_info;
378         char event_name[] = "EVENT=ADD_DEVICE";
379         char raid_slot[16];
380         char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
381         int len;
382
383         len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
384         pretty_uuid(disk_uuid + len, cmsg->uuid);
385         snprintf(raid_slot, 16, "RAID_DISK=%d", cmsg->raid_slot);
386         pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
387         init_completion(&cinfo->newdisk_completion);
388         set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
389         kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
390         wait_for_completion_timeout(&cinfo->newdisk_completion,
391                         NEW_DEV_TIMEOUT);
392         clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
393 }
394
395
396 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
397 {
398         struct md_cluster_info *cinfo = mddev->cluster_info;
399
400         md_reload_sb(mddev);
401         dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
402 }
403
404 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
405 {
406         switch (msg->type) {
407         case METADATA_UPDATED:
408                 pr_info("%s: %d Received message: METADATA_UPDATE from %d\n",
409                         __func__, __LINE__, msg->slot);
410                 process_metadata_update(mddev, msg);
411                 break;
412         case RESYNCING:
413                 pr_info("%s: %d Received message: RESYNCING from %d\n",
414                         __func__, __LINE__, msg->slot);
415                 process_suspend_info(mddev->cluster_info, msg->slot,
416                                 msg->low, msg->high);
417                 break;
418         case NEWDISK:
419                 pr_info("%s: %d Received message: NEWDISK from %d\n",
420                         __func__, __LINE__, msg->slot);
421                 process_add_new_disk(mddev, msg);
422         };
423 }
424
425 /*
426  * thread for receiving message
427  */
428 static void recv_daemon(struct md_thread *thread)
429 {
430         struct md_cluster_info *cinfo = thread->mddev->cluster_info;
431         struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
432         struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
433         struct cluster_msg msg;
434
435         /*get CR on Message*/
436         if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
437                 pr_err("md/raid1:failed to get CR on MESSAGE\n");
438                 return;
439         }
440
441         /* read lvb and wake up thread to process this message_lockres */
442         memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
443         process_recvd_msg(thread->mddev, &msg);
444
445         /*release CR on ack_lockres*/
446         dlm_unlock_sync(ack_lockres);
447         /*up-convert to EX on message_lockres*/
448         dlm_lock_sync(message_lockres, DLM_LOCK_EX);
449         /*get CR on ack_lockres again*/
450         dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
451         /*release CR on message_lockres*/
452         dlm_unlock_sync(message_lockres);
453 }
454
455 /* lock_comm()
456  * Takes the lock on the TOKEN lock resource so no other
457  * node can communicate while the operation is underway.
458  */
459 static int lock_comm(struct md_cluster_info *cinfo)
460 {
461         int error;
462
463         error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
464         if (error)
465                 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
466                                 __func__, __LINE__, error);
467         return error;
468 }
469
470 static void unlock_comm(struct md_cluster_info *cinfo)
471 {
472         dlm_unlock_sync(cinfo->token_lockres);
473 }
474
475 /* __sendmsg()
476  * This function performs the actual sending of the message. This function is
477  * usually called after performing the encompassing operation
478  * The function:
479  * 1. Grabs the message lockresource in EX mode
480  * 2. Copies the message to the message LVB
481  * 3. Downconverts message lockresource to CR
482  * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
483  *    and the other nodes read the message. The thread will wait here until all other
484  *    nodes have released ack lock resource.
485  * 5. Downconvert ack lockresource to CR
486  */
487 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
488 {
489         int error;
490         int slot = cinfo->slot_number - 1;
491
492         cmsg->slot = cpu_to_le32(slot);
493         /*get EX on Message*/
494         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
495         if (error) {
496                 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
497                 goto failed_message;
498         }
499
500         memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
501                         sizeof(struct cluster_msg));
502         /*down-convert EX to CR on Message*/
503         error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CR);
504         if (error) {
505                 pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n",
506                                 error);
507                 goto failed_message;
508         }
509
510         /*up-convert CR to EX on Ack*/
511         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
512         if (error) {
513                 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
514                                 error);
515                 goto failed_ack;
516         }
517
518         /*down-convert EX to CR on Ack*/
519         error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
520         if (error) {
521                 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
522                                 error);
523                 goto failed_ack;
524         }
525
526 failed_ack:
527         dlm_unlock_sync(cinfo->message_lockres);
528 failed_message:
529         return error;
530 }
531
532 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
533 {
534         int ret;
535
536         lock_comm(cinfo);
537         ret = __sendmsg(cinfo, cmsg);
538         unlock_comm(cinfo);
539         return ret;
540 }
541
542 static int gather_all_resync_info(struct mddev *mddev, int total_slots)
543 {
544         struct md_cluster_info *cinfo = mddev->cluster_info;
545         int i, ret = 0;
546         struct dlm_lock_resource *bm_lockres;
547         struct suspend_info *s;
548         char str[64];
549
550
551         for (i = 0; i < total_slots; i++) {
552                 memset(str, '\0', 64);
553                 snprintf(str, 64, "bitmap%04d", i);
554                 bm_lockres = lockres_init(mddev, str, NULL, 1);
555                 if (!bm_lockres)
556                         return -ENOMEM;
557                 if (i == (cinfo->slot_number - 1))
558                         continue;
559
560                 bm_lockres->flags |= DLM_LKF_NOQUEUE;
561                 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
562                 if (ret == -EAGAIN) {
563                         memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE);
564                         s = read_resync_info(mddev, bm_lockres);
565                         if (s) {
566                                 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
567                                                 __func__, __LINE__,
568                                                 (unsigned long long) s->lo,
569                                                 (unsigned long long) s->hi, i);
570                                 spin_lock_irq(&cinfo->suspend_lock);
571                                 s->slot = i;
572                                 list_add(&s->list, &cinfo->suspend_list);
573                                 spin_unlock_irq(&cinfo->suspend_lock);
574                         }
575                         ret = 0;
576                         lockres_free(bm_lockres);
577                         continue;
578                 }
579                 if (ret)
580                         goto out;
581                 /* TODO: Read the disk bitmap sb and check if it needs recovery */
582                 dlm_unlock_sync(bm_lockres);
583                 lockres_free(bm_lockres);
584         }
585 out:
586         return ret;
587 }
588
589 static int join(struct mddev *mddev, int nodes)
590 {
591         struct md_cluster_info *cinfo;
592         int ret, ops_rv;
593         char str[64];
594
595         if (!try_module_get(THIS_MODULE))
596                 return -ENOENT;
597
598         cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
599         if (!cinfo)
600                 return -ENOMEM;
601
602         init_completion(&cinfo->completion);
603
604         mutex_init(&cinfo->sb_mutex);
605         mddev->cluster_info = cinfo;
606
607         memset(str, 0, 64);
608         pretty_uuid(str, mddev->uuid);
609         ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
610                                 DLM_LSFL_FS, LVB_SIZE,
611                                 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
612         if (ret)
613                 goto err;
614         wait_for_completion(&cinfo->completion);
615         if (nodes <= cinfo->slot_number) {
616                 pr_err("md-cluster: Slot allotted(%d) greater than available slots(%d)", cinfo->slot_number - 1,
617                         nodes);
618                 ret = -ERANGE;
619                 goto err;
620         }
621         cinfo->sb_lock = lockres_init(mddev, "cmd-super",
622                                         NULL, 0);
623         if (!cinfo->sb_lock) {
624                 ret = -ENOMEM;
625                 goto err;
626         }
627         /* Initiate the communication resources */
628         ret = -ENOMEM;
629         cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
630         if (!cinfo->recv_thread) {
631                 pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
632                 goto err;
633         }
634         cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
635         if (!cinfo->message_lockres)
636                 goto err;
637         cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
638         if (!cinfo->token_lockres)
639                 goto err;
640         cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
641         if (!cinfo->ack_lockres)
642                 goto err;
643         cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
644         if (!cinfo->no_new_dev_lockres)
645                 goto err;
646
647         /* get sync CR lock on ACK. */
648         if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
649                 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
650                                 ret);
651         /* get sync CR lock on no-new-dev. */
652         if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
653                 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
654
655
656         pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
657         snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
658         cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
659         if (!cinfo->bitmap_lockres)
660                 goto err;
661         if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
662                 pr_err("Failed to get bitmap lock\n");
663                 ret = -EINVAL;
664                 goto err;
665         }
666
667         INIT_LIST_HEAD(&cinfo->suspend_list);
668         spin_lock_init(&cinfo->suspend_lock);
669
670         ret = gather_all_resync_info(mddev, nodes);
671         if (ret)
672                 goto err;
673
674         return 0;
675 err:
676         lockres_free(cinfo->message_lockres);
677         lockres_free(cinfo->token_lockres);
678         lockres_free(cinfo->ack_lockres);
679         lockres_free(cinfo->no_new_dev_lockres);
680         lockres_free(cinfo->bitmap_lockres);
681         lockres_free(cinfo->sb_lock);
682         if (cinfo->lockspace)
683                 dlm_release_lockspace(cinfo->lockspace, 2);
684         mddev->cluster_info = NULL;
685         kfree(cinfo);
686         module_put(THIS_MODULE);
687         return ret;
688 }
689
690 static int leave(struct mddev *mddev)
691 {
692         struct md_cluster_info *cinfo = mddev->cluster_info;
693
694         if (!cinfo)
695                 return 0;
696         md_unregister_thread(&cinfo->recovery_thread);
697         md_unregister_thread(&cinfo->recv_thread);
698         lockres_free(cinfo->message_lockres);
699         lockres_free(cinfo->token_lockres);
700         lockres_free(cinfo->ack_lockres);
701         lockres_free(cinfo->no_new_dev_lockres);
702         lockres_free(cinfo->sb_lock);
703         lockres_free(cinfo->bitmap_lockres);
704         dlm_release_lockspace(cinfo->lockspace, 2);
705         return 0;
706 }
707
708 /* slot_number(): Returns the MD slot number to use
709  * DLM starts the slot numbers from 1, wheras cluster-md
710  * wants the number to be from zero, so we deduct one
711  */
712 static int slot_number(struct mddev *mddev)
713 {
714         struct md_cluster_info *cinfo = mddev->cluster_info;
715
716         return cinfo->slot_number - 1;
717 }
718
719 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
720 {
721         struct md_cluster_info *cinfo = mddev->cluster_info;
722
723         add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi);
724         /* Re-acquire the lock to refresh LVB */
725         dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
726 }
727
728 static int metadata_update_start(struct mddev *mddev)
729 {
730         return lock_comm(mddev->cluster_info);
731 }
732
733 static int metadata_update_finish(struct mddev *mddev)
734 {
735         struct md_cluster_info *cinfo = mddev->cluster_info;
736         struct cluster_msg cmsg;
737         int ret;
738
739         memset(&cmsg, 0, sizeof(cmsg));
740         cmsg.type = cpu_to_le32(METADATA_UPDATED);
741         ret = __sendmsg(cinfo, &cmsg);
742         unlock_comm(cinfo);
743         return ret;
744 }
745
746 static int metadata_update_cancel(struct mddev *mddev)
747 {
748         struct md_cluster_info *cinfo = mddev->cluster_info;
749
750         return dlm_unlock_sync(cinfo->token_lockres);
751 }
752
753 static int resync_send(struct mddev *mddev, enum msg_type type,
754                 sector_t lo, sector_t hi)
755 {
756         struct md_cluster_info *cinfo = mddev->cluster_info;
757         struct cluster_msg cmsg;
758         int slot = cinfo->slot_number - 1;
759
760         pr_info("%s:%d lo: %llu hi: %llu\n", __func__, __LINE__,
761                         (unsigned long long)lo,
762                         (unsigned long long)hi);
763         resync_info_update(mddev, lo, hi);
764         cmsg.type = cpu_to_le32(type);
765         cmsg.slot = cpu_to_le32(slot);
766         cmsg.low = cpu_to_le64(lo);
767         cmsg.high = cpu_to_le64(hi);
768         return sendmsg(cinfo, &cmsg);
769 }
770
771 static int resync_start(struct mddev *mddev, sector_t lo, sector_t hi)
772 {
773         pr_info("%s:%d\n", __func__, __LINE__);
774         return resync_send(mddev, RESYNCING, lo, hi);
775 }
776
777 static void resync_finish(struct mddev *mddev)
778 {
779         pr_info("%s:%d\n", __func__, __LINE__);
780         resync_send(mddev, RESYNCING, 0, 0);
781 }
782
783 static int area_resyncing(struct mddev *mddev, sector_t lo, sector_t hi)
784 {
785         struct md_cluster_info *cinfo = mddev->cluster_info;
786         int ret = 0;
787         struct suspend_info *s;
788
789         spin_lock_irq(&cinfo->suspend_lock);
790         if (list_empty(&cinfo->suspend_list))
791                 goto out;
792         list_for_each_entry(s, &cinfo->suspend_list, list)
793                 if (hi > s->lo && lo < s->hi) {
794                         ret = 1;
795                         break;
796                 }
797 out:
798         spin_unlock_irq(&cinfo->suspend_lock);
799         return ret;
800 }
801
802 static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev)
803 {
804         struct md_cluster_info *cinfo = mddev->cluster_info;
805         struct cluster_msg cmsg;
806         int ret = 0;
807         struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
808         char *uuid = sb->device_uuid;
809
810         memset(&cmsg, 0, sizeof(cmsg));
811         cmsg.type = cpu_to_le32(NEWDISK);
812         memcpy(cmsg.uuid, uuid, 16);
813         cmsg.raid_slot = rdev->desc_nr;
814         lock_comm(cinfo);
815         ret = __sendmsg(cinfo, &cmsg);
816         if (ret)
817                 return ret;
818         cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
819         ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
820         cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
821         /* Some node does not "see" the device */
822         if (ret == -EAGAIN)
823                 ret = -ENOENT;
824         else
825                 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
826         return ret;
827 }
828
829 static int add_new_disk_finish(struct mddev *mddev)
830 {
831         struct cluster_msg cmsg;
832         struct md_cluster_info *cinfo = mddev->cluster_info;
833         int ret;
834         /* Write sb and inform others */
835         md_update_sb(mddev, 1);
836         cmsg.type = METADATA_UPDATED;
837         ret = __sendmsg(cinfo, &cmsg);
838         unlock_comm(cinfo);
839         return ret;
840 }
841
842 static int new_disk_ack(struct mddev *mddev, bool ack)
843 {
844         struct md_cluster_info *cinfo = mddev->cluster_info;
845
846         if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
847                 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
848                 return -EINVAL;
849         }
850
851         if (ack)
852                 dlm_unlock_sync(cinfo->no_new_dev_lockres);
853         complete(&cinfo->newdisk_completion);
854         return 0;
855 }
856
857 static struct md_cluster_operations cluster_ops = {
858         .join   = join,
859         .leave  = leave,
860         .slot_number = slot_number,
861         .resync_info_update = resync_info_update,
862         .resync_start = resync_start,
863         .resync_finish = resync_finish,
864         .metadata_update_start = metadata_update_start,
865         .metadata_update_finish = metadata_update_finish,
866         .metadata_update_cancel = metadata_update_cancel,
867         .area_resyncing = area_resyncing,
868         .add_new_disk_start = add_new_disk_start,
869         .add_new_disk_finish = add_new_disk_finish,
870         .new_disk_ack = new_disk_ack,
871 };
872
873 static int __init cluster_init(void)
874 {
875         pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n");
876         pr_info("Registering Cluster MD functions\n");
877         register_md_cluster_operations(&cluster_ops, THIS_MODULE);
878         return 0;
879 }
880
881 static void cluster_exit(void)
882 {
883         unregister_md_cluster_operations();
884 }
885
886 module_init(cluster_init);
887 module_exit(cluster_exit);
888 MODULE_LICENSE("GPL");
889 MODULE_DESCRIPTION("Clustering support for MD");