OSDN Git Service

965f45dbe17bfd4825b9f1dc43988cb4fc89718a
[tomoyo/tomoyo-test1.git] / fs / ocfs2 / dlm / dlmconvert.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* -*- mode: c; c-basic-offset: 8; -*-
3  * vim: noexpandtab sw=8 ts=8 sts=0:
4  *
5  * dlmconvert.c
6  *
7  * underlying calls for lock conversion
8  *
9  * Copyright (C) 2004 Oracle.  All rights reserved.
10  */
11
12
13 #include <linux/module.h>
14 #include <linux/fs.h>
15 #include <linux/types.h>
16 #include <linux/highmem.h>
17 #include <linux/init.h>
18 #include <linux/sysctl.h>
19 #include <linux/random.h>
20 #include <linux/blkdev.h>
21 #include <linux/socket.h>
22 #include <linux/inet.h>
23 #include <linux/spinlock.h>
24
25
26 #include "cluster/heartbeat.h"
27 #include "cluster/nodemanager.h"
28 #include "cluster/tcp.h"
29
30 #include "dlmapi.h"
31 #include "dlmcommon.h"
32
33 #include "dlmconvert.h"
34
35 #define MLOG_MASK_PREFIX ML_DLM
36 #include "cluster/masklog.h"
37
38 /* NOTE: __dlmconvert_master is the only function in here that
39  * needs a spinlock held on entry (res->spinlock) and it is the
40  * only one that holds a lock on exit (res->spinlock).
41  * All other functions in here need no locks and drop all of
42  * the locks that they acquire. */
43 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
44                                            struct dlm_lock_resource *res,
45                                            struct dlm_lock *lock, int flags,
46                                            int type, int *call_ast,
47                                            int *kick_thread);
48 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
49                                            struct dlm_lock_resource *res,
50                                            struct dlm_lock *lock, int flags, int type);
51
52 /*
53  * this is only called directly by dlmlock(), and only when the
54  * local node is the owner of the lockres
55  * locking:
56  *   caller needs:  none
57  *   taken:         takes and drops res->spinlock
58  *   held on exit:  none
59  * returns: see __dlmconvert_master
60  */
61 enum dlm_status dlmconvert_master(struct dlm_ctxt *dlm,
62                                   struct dlm_lock_resource *res,
63                                   struct dlm_lock *lock, int flags, int type)
64 {
65         int call_ast = 0, kick_thread = 0;
66         enum dlm_status status;
67
68         spin_lock(&res->spinlock);
69         /* we are not in a network handler, this is fine */
70         __dlm_wait_on_lockres(res);
71         __dlm_lockres_reserve_ast(res);
72         res->state |= DLM_LOCK_RES_IN_PROGRESS;
73
74         status = __dlmconvert_master(dlm, res, lock, flags, type,
75                                      &call_ast, &kick_thread);
76
77         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
78         spin_unlock(&res->spinlock);
79         wake_up(&res->wq);
80         if (status != DLM_NORMAL && status != DLM_NOTQUEUED)
81                 dlm_error(status);
82
83         /* either queue the ast or release it */
84         if (call_ast)
85                 dlm_queue_ast(dlm, lock);
86         else
87                 dlm_lockres_release_ast(dlm, res);
88
89         if (kick_thread)
90                 dlm_kick_thread(dlm, res);
91
92         return status;
93 }
94
95 /* performs lock conversion at the lockres master site
96  * locking:
97  *   caller needs:  res->spinlock
98  *   taken:         takes and drops lock->spinlock
99  *   held on exit:  res->spinlock
100  * returns: DLM_NORMAL, DLM_NOTQUEUED, DLM_DENIED
101  *   call_ast: whether ast should be called for this lock
102  *   kick_thread: whether dlm_kick_thread should be called
103  */
104 static enum dlm_status __dlmconvert_master(struct dlm_ctxt *dlm,
105                                            struct dlm_lock_resource *res,
106                                            struct dlm_lock *lock, int flags,
107                                            int type, int *call_ast,
108                                            int *kick_thread)
109 {
110         enum dlm_status status = DLM_NORMAL;
111         struct dlm_lock *tmplock=NULL;
112
113         assert_spin_locked(&res->spinlock);
114
115         mlog(0, "type=%d, convert_type=%d, new convert_type=%d\n",
116              lock->ml.type, lock->ml.convert_type, type);
117
118         spin_lock(&lock->spinlock);
119
120         /* already converting? */
121         if (lock->ml.convert_type != LKM_IVMODE) {
122                 mlog(ML_ERROR, "attempted to convert a lock with a lock "
123                      "conversion pending\n");
124                 status = DLM_DENIED;
125                 goto unlock_exit;
126         }
127
128         /* must be on grant queue to convert */
129         if (!dlm_lock_on_list(&res->granted, lock)) {
130                 mlog(ML_ERROR, "attempted to convert a lock not on grant "
131                      "queue\n");
132                 status = DLM_DENIED;
133                 goto unlock_exit;
134         }
135
136         if (flags & LKM_VALBLK) {
137                 switch (lock->ml.type) {
138                         case LKM_EXMODE:
139                                 /* EX + LKM_VALBLK + convert == set lvb */
140                                 mlog(0, "will set lvb: converting %s->%s\n",
141                                      dlm_lock_mode_name(lock->ml.type),
142                                      dlm_lock_mode_name(type));
143                                 lock->lksb->flags |= DLM_LKSB_PUT_LVB;
144                                 break;
145                         case LKM_PRMODE:
146                         case LKM_NLMODE:
147                                 /* refetch if new level is not NL */
148                                 if (type > LKM_NLMODE) {
149                                         mlog(0, "will fetch new value into "
150                                              "lvb: converting %s->%s\n",
151                                              dlm_lock_mode_name(lock->ml.type),
152                                              dlm_lock_mode_name(type));
153                                         lock->lksb->flags |= DLM_LKSB_GET_LVB;
154                                 } else {
155                                         mlog(0, "will NOT fetch new value "
156                                              "into lvb: converting %s->%s\n",
157                                              dlm_lock_mode_name(lock->ml.type),
158                                              dlm_lock_mode_name(type));
159                                         flags &= ~(LKM_VALBLK);
160                                 }
161                                 break;
162                 }
163         }
164
165
166         /* in-place downconvert? */
167         if (type <= lock->ml.type)
168                 goto grant;
169
170         /* upconvert from here on */
171         status = DLM_NORMAL;
172         list_for_each_entry(tmplock, &res->granted, list) {
173                 if (tmplock == lock)
174                         continue;
175                 if (!dlm_lock_compatible(tmplock->ml.type, type))
176                         goto switch_queues;
177         }
178
179         list_for_each_entry(tmplock, &res->converting, list) {
180                 if (!dlm_lock_compatible(tmplock->ml.type, type))
181                         goto switch_queues;
182                 /* existing conversion requests take precedence */
183                 if (!dlm_lock_compatible(tmplock->ml.convert_type, type))
184                         goto switch_queues;
185         }
186
187         /* fall thru to grant */
188
189 grant:
190         mlog(0, "res %.*s, granting %s lock\n", res->lockname.len,
191              res->lockname.name, dlm_lock_mode_name(type));
192         /* immediately grant the new lock type */
193         lock->lksb->status = DLM_NORMAL;
194         if (lock->ml.node == dlm->node_num)
195                 mlog(0, "doing in-place convert for nonlocal lock\n");
196         lock->ml.type = type;
197         if (lock->lksb->flags & DLM_LKSB_PUT_LVB)
198                 memcpy(res->lvb, lock->lksb->lvb, DLM_LVB_LEN);
199
200         /*
201          * Move the lock to the tail because it may be the only lock which has
202          * an invalid lvb.
203          */
204         list_move_tail(&lock->list, &res->granted);
205
206         status = DLM_NORMAL;
207         *call_ast = 1;
208         goto unlock_exit;
209
210 switch_queues:
211         if (flags & LKM_NOQUEUE) {
212                 mlog(0, "failed to convert NOQUEUE lock %.*s from "
213                      "%d to %d...\n", res->lockname.len, res->lockname.name,
214                      lock->ml.type, type);
215                 status = DLM_NOTQUEUED;
216                 goto unlock_exit;
217         }
218         mlog(0, "res %.*s, queueing...\n", res->lockname.len,
219              res->lockname.name);
220
221         lock->ml.convert_type = type;
222         /* do not alter lock refcount.  switching lists. */
223         list_move_tail(&lock->list, &res->converting);
224
225 unlock_exit:
226         spin_unlock(&lock->spinlock);
227         if (status == DLM_DENIED) {
228                 __dlm_print_one_lock_resource(res);
229         }
230         if (status == DLM_NORMAL)
231                 *kick_thread = 1;
232         return status;
233 }
234
235 void dlm_revert_pending_convert(struct dlm_lock_resource *res,
236                                 struct dlm_lock *lock)
237 {
238         /* do not alter lock refcount.  switching lists. */
239         list_move_tail(&lock->list, &res->granted);
240         lock->ml.convert_type = LKM_IVMODE;
241         lock->lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
242 }
243
244 /* messages the master site to do lock conversion
245  * locking:
246  *   caller needs:  none
247  *   taken:         takes and drops res->spinlock, uses DLM_LOCK_RES_IN_PROGRESS
248  *   held on exit:  none
249  * returns: DLM_NORMAL, DLM_RECOVERING, status from remote node
250  */
251 enum dlm_status dlmconvert_remote(struct dlm_ctxt *dlm,
252                                   struct dlm_lock_resource *res,
253                                   struct dlm_lock *lock, int flags, int type)
254 {
255         enum dlm_status status;
256
257         mlog(0, "type=%d, convert_type=%d, busy=%d\n", lock->ml.type,
258              lock->ml.convert_type, res->state & DLM_LOCK_RES_IN_PROGRESS);
259
260         spin_lock(&res->spinlock);
261         if (res->state & DLM_LOCK_RES_RECOVERING) {
262                 mlog(0, "bailing out early since res is RECOVERING "
263                      "on secondary queue\n");
264                 /* __dlm_print_one_lock_resource(res); */
265                 status = DLM_RECOVERING;
266                 goto bail;
267         }
268         /* will exit this call with spinlock held */
269         __dlm_wait_on_lockres(res);
270
271         if (lock->ml.convert_type != LKM_IVMODE) {
272                 __dlm_print_one_lock_resource(res);
273                 mlog(ML_ERROR, "converting a remote lock that is already "
274                      "converting! (cookie=%u:%llu, conv=%d)\n",
275                      dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
276                      dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
277                      lock->ml.convert_type);
278                 status = DLM_DENIED;
279                 goto bail;
280         }
281
282         if (lock->ml.type == type && lock->ml.convert_type == LKM_IVMODE) {
283                 mlog(0, "last convert request returned DLM_RECOVERING, but "
284                      "owner has already queued and sent ast to me. res %.*s, "
285                      "(cookie=%u:%llu, type=%d, conv=%d)\n",
286                      res->lockname.len, res->lockname.name,
287                      dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
288                      dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
289                      lock->ml.type, lock->ml.convert_type);
290                 status = DLM_NORMAL;
291                 goto bail;
292         }
293
294         res->state |= DLM_LOCK_RES_IN_PROGRESS;
295         /* move lock to local convert queue */
296         /* do not alter lock refcount.  switching lists. */
297         list_move_tail(&lock->list, &res->converting);
298         lock->convert_pending = 1;
299         lock->ml.convert_type = type;
300
301         if (flags & LKM_VALBLK) {
302                 if (lock->ml.type == LKM_EXMODE) {
303                         flags |= LKM_PUT_LVB;
304                         lock->lksb->flags |= DLM_LKSB_PUT_LVB;
305                 } else {
306                         if (lock->ml.convert_type == LKM_NLMODE)
307                                 flags &= ~LKM_VALBLK;
308                         else {
309                                 flags |= LKM_GET_LVB;
310                                 lock->lksb->flags |= DLM_LKSB_GET_LVB;
311                         }
312                 }
313         }
314         spin_unlock(&res->spinlock);
315
316         /* no locks held here.
317          * need to wait for a reply as to whether it got queued or not. */
318         status = dlm_send_remote_convert_request(dlm, res, lock, flags, type);
319
320         spin_lock(&res->spinlock);
321         res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
322         /* if it failed, move it back to granted queue.
323          * if master returns DLM_NORMAL and then down before sending ast,
324          * it may have already been moved to granted queue, reset to
325          * DLM_RECOVERING and retry convert */
326         if (status != DLM_NORMAL) {
327                 if (status != DLM_NOTQUEUED)
328                         dlm_error(status);
329                 dlm_revert_pending_convert(res, lock);
330         } else if (!lock->convert_pending) {
331                 mlog(0, "%s: res %.*s, owner died and lock has been moved back "
332                                 "to granted list, retry convert.\n",
333                                 dlm->name, res->lockname.len, res->lockname.name);
334                 status = DLM_RECOVERING;
335         }
336
337         lock->convert_pending = 0;
338 bail:
339         spin_unlock(&res->spinlock);
340
341         /* TODO: should this be a wake_one? */
342         /* wake up any IN_PROGRESS waiters */
343         wake_up(&res->wq);
344
345         return status;
346 }
347
348 /* sends DLM_CONVERT_LOCK_MSG to master site
349  * locking:
350  *   caller needs:  none
351  *   taken:         none
352  *   held on exit:  none
353  * returns: DLM_NOLOCKMGR, status from remote node
354  */
355 static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
356                                            struct dlm_lock_resource *res,
357                                            struct dlm_lock *lock, int flags, int type)
358 {
359         struct dlm_convert_lock convert;
360         int tmpret;
361         enum dlm_status ret;
362         int status = 0;
363         struct kvec vec[2];
364         size_t veclen = 1;
365
366         mlog(0, "%.*s\n", res->lockname.len, res->lockname.name);
367
368         memset(&convert, 0, sizeof(struct dlm_convert_lock));
369         convert.node_idx = dlm->node_num;
370         convert.requested_type = type;
371         convert.cookie = lock->ml.cookie;
372         convert.namelen = res->lockname.len;
373         convert.flags = cpu_to_be32(flags);
374         memcpy(convert.name, res->lockname.name, convert.namelen);
375
376         vec[0].iov_len = sizeof(struct dlm_convert_lock);
377         vec[0].iov_base = &convert;
378
379         if (flags & LKM_PUT_LVB) {
380                 /* extra data to send if we are updating lvb */
381                 vec[1].iov_len = DLM_LVB_LEN;
382                 vec[1].iov_base = lock->lksb->lvb;
383                 veclen++;
384         }
385
386         tmpret = o2net_send_message_vec(DLM_CONVERT_LOCK_MSG, dlm->key,
387                                         vec, veclen, res->owner, &status);
388         if (tmpret >= 0) {
389                 // successfully sent and received
390                 ret = status;  // this is already a dlm_status
391                 if (ret == DLM_RECOVERING) {
392                         mlog(0, "node %u returned DLM_RECOVERING from convert "
393                              "message!\n", res->owner);
394                 } else if (ret == DLM_MIGRATING) {
395                         mlog(0, "node %u returned DLM_MIGRATING from convert "
396                              "message!\n", res->owner);
397                 } else if (ret == DLM_FORWARD) {
398                         mlog(0, "node %u returned DLM_FORWARD from convert "
399                              "message!\n", res->owner);
400                 } else if (ret != DLM_NORMAL && ret != DLM_NOTQUEUED)
401                         dlm_error(ret);
402         } else {
403                 mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
404                      "node %u\n", tmpret, DLM_CONVERT_LOCK_MSG, dlm->key,
405                      res->owner);
406                 if (dlm_is_host_down(tmpret)) {
407                         /* instead of logging the same network error over
408                          * and over, sleep here and wait for the heartbeat
409                          * to notice the node is dead.  times out after 5s. */
410                         dlm_wait_for_node_death(dlm, res->owner,
411                                                 DLM_NODE_DEATH_WAIT_MAX);
412                         ret = DLM_RECOVERING;
413                         mlog(0, "node %u died so returning DLM_RECOVERING "
414                              "from convert message!\n", res->owner);
415                 } else {
416                         ret = dlm_err_to_dlm_status(tmpret);
417                 }
418         }
419
420         return ret;
421 }
422
423 /* handler for DLM_CONVERT_LOCK_MSG on master site
424  * locking:
425  *   caller needs:  none
426  *   taken:         takes and drop res->spinlock
427  *   held on exit:  none
428  * returns: DLM_NORMAL, DLM_IVLOCKID, DLM_BADARGS,
429  *          status from __dlmconvert_master
430  */
431 int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data,
432                              void **ret_data)
433 {
434         struct dlm_ctxt *dlm = data;
435         struct dlm_convert_lock *cnv = (struct dlm_convert_lock *)msg->buf;
436         struct dlm_lock_resource *res = NULL;
437         struct dlm_lock *lock = NULL;
438         struct dlm_lock *tmp_lock;
439         struct dlm_lockstatus *lksb;
440         enum dlm_status status = DLM_NORMAL;
441         u32 flags;
442         int call_ast = 0, kick_thread = 0, ast_reserved = 0, wake = 0;
443
444         if (!dlm_grab(dlm)) {
445                 dlm_error(DLM_REJECTED);
446                 return DLM_REJECTED;
447         }
448
449         mlog_bug_on_msg(!dlm_domain_fully_joined(dlm),
450                         "Domain %s not fully joined!\n", dlm->name);
451
452         if (cnv->namelen > DLM_LOCKID_NAME_MAX) {
453                 status = DLM_IVBUFLEN;
454                 dlm_error(status);
455                 goto leave;
456         }
457
458         flags = be32_to_cpu(cnv->flags);
459
460         if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
461              (LKM_PUT_LVB|LKM_GET_LVB)) {
462                 mlog(ML_ERROR, "both PUT and GET lvb specified\n");
463                 status = DLM_BADARGS;
464                 goto leave;
465         }
466
467         mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" :
468              (flags & LKM_GET_LVB ? "get lvb" : "none"));
469
470         status = DLM_IVLOCKID;
471         res = dlm_lookup_lockres(dlm, cnv->name, cnv->namelen);
472         if (!res) {
473                 dlm_error(status);
474                 goto leave;
475         }
476
477         spin_lock(&res->spinlock);
478         status = __dlm_lockres_state_to_status(res);
479         if (status != DLM_NORMAL) {
480                 spin_unlock(&res->spinlock);
481                 dlm_error(status);
482                 goto leave;
483         }
484         list_for_each_entry(tmp_lock, &res->granted, list) {
485                 if (tmp_lock->ml.cookie == cnv->cookie &&
486                     tmp_lock->ml.node == cnv->node_idx) {
487                         lock = tmp_lock;
488                         dlm_lock_get(lock);
489                         break;
490                 }
491         }
492         spin_unlock(&res->spinlock);
493         if (!lock) {
494                 status = DLM_IVLOCKID;
495                 mlog(ML_ERROR, "did not find lock to convert on grant queue! "
496                                "cookie=%u:%llu\n",
497                      dlm_get_lock_cookie_node(be64_to_cpu(cnv->cookie)),
498                      dlm_get_lock_cookie_seq(be64_to_cpu(cnv->cookie)));
499                 dlm_print_one_lock_resource(res);
500                 goto leave;
501         }
502
503         /* found the lock */
504         lksb = lock->lksb;
505
506         /* see if caller needed to get/put lvb */
507         if (flags & LKM_PUT_LVB) {
508                 BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
509                 lksb->flags |= DLM_LKSB_PUT_LVB;
510                 memcpy(&lksb->lvb[0], &cnv->lvb[0], DLM_LVB_LEN);
511         } else if (flags & LKM_GET_LVB) {
512                 BUG_ON(lksb->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));
513                 lksb->flags |= DLM_LKSB_GET_LVB;
514         }
515
516         spin_lock(&res->spinlock);
517         status = __dlm_lockres_state_to_status(res);
518         if (status == DLM_NORMAL) {
519                 __dlm_lockres_reserve_ast(res);
520                 ast_reserved = 1;
521                 res->state |= DLM_LOCK_RES_IN_PROGRESS;
522                 status = __dlmconvert_master(dlm, res, lock, flags,
523                                              cnv->requested_type,
524                                              &call_ast, &kick_thread);
525                 res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
526                 wake = 1;
527         }
528         spin_unlock(&res->spinlock);
529         if (wake)
530                 wake_up(&res->wq);
531
532         if (status != DLM_NORMAL) {
533                 if (status != DLM_NOTQUEUED)
534                         dlm_error(status);
535                 lksb->flags &= ~(DLM_LKSB_GET_LVB|DLM_LKSB_PUT_LVB);
536         }
537
538 leave:
539         if (lock)
540                 dlm_lock_put(lock);
541
542         /* either queue the ast or release it, if reserved */
543         if (call_ast)
544                 dlm_queue_ast(dlm, lock);
545         else if (ast_reserved)
546                 dlm_lockres_release_ast(dlm, res);
547
548         if (kick_thread)
549                 dlm_kick_thread(dlm, res);
550
551         if (res)
552                 dlm_lockres_put(res);
553
554         dlm_put(dlm);
555
556         return status;
557 }