OSDN Git Service

b798daa094bc9723ee8ebb62b0fee8267b55a040
[android-x86/kernel.git] / drivers / staging / lustre / lustre / ldlm / ldlm_flock.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003 Hewlett-Packard Development Company LP.
28  * Developed under the sponsorship of the US Government under
29  * Subcontract No. B514193
30  *
31  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
32  * Use is subject to license terms.
33  *
34  * Copyright (c) 2010, 2012, Intel Corporation.
35  */
36 /*
37  * This file is part of Lustre, http://www.lustre.org/
38  * Lustre is a trademark of Sun Microsystems, Inc.
39  */
40
41 /**
42  * This file implements POSIX lock type for Lustre.
43  * Its policy properties are start and end of extent and PID.
44  *
45  * These locks are only done through MDS due to POSIX semantics requiring
46  * e.g. that locks could be only partially released and as such split into
47  * two parts, and also that two adjacent locks from the same process may be
48  * merged into a single wider lock.
49  *
50  * Lock modes are mapped like this:
51  * PR and PW for READ and WRITE locks
52  * NL to request a releasing of a portion of the lock
53  *
54  * These flock locks never timeout.
55  */
56
57 #define DEBUG_SUBSYSTEM S_LDLM
58
59 #include "../include/lustre_dlm.h"
60 #include "../include/obd_support.h"
61 #include "../include/obd_class.h"
62 #include "../include/lustre_lib.h"
63 #include <linux/list.h>
64 #include "ldlm_internal.h"
65
66 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
67                             void *data, int flag);
68
69 /**
70  * list_for_remaining_safe - iterate over the remaining entries in a list
71  *            and safeguard against removal of a list entry.
72  * \param pos   the &struct list_head to use as a loop counter. pos MUST
73  *            have been initialized prior to using it in this macro.
74  * \param n     another &struct list_head to use as temporary storage
75  * \param head  the head for your list.
76  */
77 #define list_for_remaining_safe(pos, n, head) \
78         for (n = pos->next; pos != (head); pos = n, n = pos->next)
79
80 static inline int
81 ldlm_same_flock_owner(struct ldlm_lock *lock, struct ldlm_lock *new)
82 {
83         return((new->l_policy_data.l_flock.owner ==
84                 lock->l_policy_data.l_flock.owner) &&
85                (new->l_export == lock->l_export));
86 }
87
88 static inline int
89 ldlm_flocks_overlap(struct ldlm_lock *lock, struct ldlm_lock *new)
90 {
91         return((new->l_policy_data.l_flock.start <=
92                 lock->l_policy_data.l_flock.end) &&
93                (new->l_policy_data.l_flock.end >=
94                 lock->l_policy_data.l_flock.start));
95 }
96
97 static inline void ldlm_flock_blocking_link(struct ldlm_lock *req,
98                                             struct ldlm_lock *lock)
99 {
100         /* For server only */
101         if (req->l_export == NULL)
102                 return;
103
104         LASSERT(hlist_unhashed(&req->l_exp_flock_hash));
105
106         req->l_policy_data.l_flock.blocking_owner =
107                 lock->l_policy_data.l_flock.owner;
108         req->l_policy_data.l_flock.blocking_export =
109                 lock->l_export;
110         req->l_policy_data.l_flock.blocking_refs = 0;
111
112         cfs_hash_add(req->l_export->exp_flock_hash,
113                      &req->l_policy_data.l_flock.owner,
114                      &req->l_exp_flock_hash);
115 }
116
117 static inline void ldlm_flock_blocking_unlink(struct ldlm_lock *req)
118 {
119         /* For server only */
120         if (req->l_export == NULL)
121                 return;
122
123         check_res_locked(req->l_resource);
124         if (req->l_export->exp_flock_hash != NULL &&
125             !hlist_unhashed(&req->l_exp_flock_hash))
126                 cfs_hash_del(req->l_export->exp_flock_hash,
127                              &req->l_policy_data.l_flock.owner,
128                              &req->l_exp_flock_hash);
129 }
130
131 static inline void
132 ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
133 {
134         LDLM_DEBUG(lock, "ldlm_flock_destroy(mode: %d, flags: 0x%llx)",
135                    mode, flags);
136
137         /* Safe to not lock here, since it should be empty anyway */
138         LASSERT(hlist_unhashed(&lock->l_exp_flock_hash));
139
140         list_del_init(&lock->l_res_link);
141         if (flags == LDLM_FL_WAIT_NOREPROC &&
142             !(lock->l_flags & LDLM_FL_FAILED)) {
143                 /* client side - set a flag to prevent sending a CANCEL */
144                 lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
145
146                 /* when reaching here, it is under lock_res_and_lock(). Thus,
147                    need call the nolock version of ldlm_lock_decref_internal*/
148                 ldlm_lock_decref_internal_nolock(lock, mode);
149         }
150
151         ldlm_lock_destroy_nolock(lock);
152 }
153
154 /**
155  * POSIX locks deadlock detection code.
156  *
157  * Given a new lock \a req and an existing lock \a bl_lock it conflicts
158  * with, we need to iterate through all blocked POSIX locks for this
159  * export and see if there is a deadlock condition arising. (i.e. when
160  * one client holds a lock on something and want a lock on something
161  * else and at the same time another client has the opposite situation).
162  */
163 static int
164 ldlm_flock_deadlock(struct ldlm_lock *req, struct ldlm_lock *bl_lock)
165 {
166         struct obd_export *req_exp = req->l_export;
167         struct obd_export *bl_exp = bl_lock->l_export;
168         __u64 req_owner = req->l_policy_data.l_flock.owner;
169         __u64 bl_owner = bl_lock->l_policy_data.l_flock.owner;
170
171         /* For server only */
172         if (req_exp == NULL)
173                 return 0;
174
175         class_export_get(bl_exp);
176         while (1) {
177                 struct obd_export *bl_exp_new;
178                 struct ldlm_lock *lock = NULL;
179                 struct ldlm_flock *flock;
180
181                 if (bl_exp->exp_flock_hash != NULL)
182                         lock = cfs_hash_lookup(bl_exp->exp_flock_hash,
183                                                &bl_owner);
184                 if (lock == NULL)
185                         break;
186
187                 LASSERT(req != lock);
188                 flock = &lock->l_policy_data.l_flock;
189                 LASSERT(flock->owner == bl_owner);
190                 bl_owner = flock->blocking_owner;
191                 bl_exp_new = class_export_get(flock->blocking_export);
192                 class_export_put(bl_exp);
193
194                 cfs_hash_put(bl_exp->exp_flock_hash, &lock->l_exp_flock_hash);
195                 bl_exp = bl_exp_new;
196
197                 if (bl_owner == req_owner && bl_exp == req_exp) {
198                         class_export_put(bl_exp);
199                         return 1;
200                 }
201         }
202         class_export_put(bl_exp);
203
204         return 0;
205 }
206
207 static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
208                                           struct list_head *work_list)
209 {
210         CDEBUG(D_INFO, "reprocess deadlock req=%p\n", lock);
211
212         if ((exp_connect_flags(lock->l_export) &
213                                 OBD_CONNECT_FLOCK_DEAD) == 0) {
214                 CERROR(
215                       "deadlock found, but client doesn't support flock canceliation\n");
216         } else {
217                 LASSERT(lock->l_completion_ast);
218                 LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0);
219                 lock->l_flags |= LDLM_FL_AST_SENT | LDLM_FL_CANCEL_ON_BLOCK |
220                         LDLM_FL_FLOCK_DEADLOCK;
221                 ldlm_flock_blocking_unlink(lock);
222                 ldlm_resource_unlink_lock(lock);
223                 ldlm_add_ast_work_item(lock, NULL, work_list);
224         }
225 }
226
227 /**
228  * Process a granting attempt for flock lock.
229  * Must be called under ns lock held.
230  *
231  * This function looks for any conflicts for \a lock in the granted or
232  * waiting queues. The lock is granted if no conflicts are found in
233  * either queue.
234  *
235  * It is also responsible for splitting a lock if a portion of the lock
236  * is released.
237  *
238  * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
239  *   - blocking ASTs have already been sent
240  *
241  * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
242  *   - blocking ASTs have not been sent yet, so list of conflicting locks
243  *     would be collected and ASTs sent.
244  */
245 int
246 ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
247                         ldlm_error_t *err, struct list_head *work_list)
248 {
249         struct ldlm_resource *res = req->l_resource;
250         struct ldlm_namespace *ns = ldlm_res_to_ns(res);
251         struct list_head *tmp;
252         struct list_head *ownlocks = NULL;
253         struct ldlm_lock *lock = NULL;
254         struct ldlm_lock *new = req;
255         struct ldlm_lock *new2 = NULL;
256         ldlm_mode_t mode = req->l_req_mode;
257         int local = ns_is_client(ns);
258         int added = (mode == LCK_NL);
259         int overlaps = 0;
260         int splitted = 0;
261         const struct ldlm_callback_suite null_cbs = { NULL };
262
263         CDEBUG(D_DLMTRACE, "flags %#llx owner %llu pid %u mode %u start %llu end %llu\n",
264                *flags, new->l_policy_data.l_flock.owner,
265                new->l_policy_data.l_flock.pid, mode,
266                req->l_policy_data.l_flock.start,
267                req->l_policy_data.l_flock.end);
268
269         *err = ELDLM_OK;
270
271         if (local) {
272                 /* No blocking ASTs are sent to the clients for
273                  * Posix file & record locks */
274                 req->l_blocking_ast = NULL;
275         } else {
276                 /* Called on the server for lock cancels. */
277                 req->l_blocking_ast = ldlm_flock_blocking_ast;
278         }
279
280 reprocess:
281         if ((*flags == LDLM_FL_WAIT_NOREPROC) || (mode == LCK_NL)) {
282                 /* This loop determines where this processes locks start
283                  * in the resource lr_granted list. */
284                 list_for_each(tmp, &res->lr_granted) {
285                         lock = list_entry(tmp, struct ldlm_lock,
286                                               l_res_link);
287                         if (ldlm_same_flock_owner(lock, req)) {
288                                 ownlocks = tmp;
289                                 break;
290                         }
291                 }
292         } else {
293                 int reprocess_failed = 0;
294                 lockmode_verify(mode);
295
296                 /* This loop determines if there are existing locks
297                  * that conflict with the new lock request. */
298                 list_for_each(tmp, &res->lr_granted) {
299                         lock = list_entry(tmp, struct ldlm_lock,
300                                               l_res_link);
301
302                         if (ldlm_same_flock_owner(lock, req)) {
303                                 if (!ownlocks)
304                                         ownlocks = tmp;
305                                 continue;
306                         }
307
308                         /* locks are compatible, overlap doesn't matter */
309                         if (lockmode_compat(lock->l_granted_mode, mode))
310                                 continue;
311
312                         if (!ldlm_flocks_overlap(lock, req))
313                                 continue;
314
315                         if (!first_enq) {
316                                 reprocess_failed = 1;
317                                 if (ldlm_flock_deadlock(req, lock)) {
318                                         ldlm_flock_cancel_on_deadlock(req,
319                                                         work_list);
320                                         return LDLM_ITER_CONTINUE;
321                                 }
322                                 continue;
323                         }
324
325                         if (*flags & LDLM_FL_BLOCK_NOWAIT) {
326                                 ldlm_flock_destroy(req, mode, *flags);
327                                 *err = -EAGAIN;
328                                 return LDLM_ITER_STOP;
329                         }
330
331                         if (*flags & LDLM_FL_TEST_LOCK) {
332                                 ldlm_flock_destroy(req, mode, *flags);
333                                 req->l_req_mode = lock->l_granted_mode;
334                                 req->l_policy_data.l_flock.pid =
335                                         lock->l_policy_data.l_flock.pid;
336                                 req->l_policy_data.l_flock.start =
337                                         lock->l_policy_data.l_flock.start;
338                                 req->l_policy_data.l_flock.end =
339                                         lock->l_policy_data.l_flock.end;
340                                 *flags |= LDLM_FL_LOCK_CHANGED;
341                                 return LDLM_ITER_STOP;
342                         }
343
344                         /* add lock to blocking list before deadlock
345                          * check to prevent race */
346                         ldlm_flock_blocking_link(req, lock);
347
348                         if (ldlm_flock_deadlock(req, lock)) {
349                                 ldlm_flock_blocking_unlink(req);
350                                 ldlm_flock_destroy(req, mode, *flags);
351                                 *err = -EDEADLK;
352                                 return LDLM_ITER_STOP;
353                         }
354
355                         ldlm_resource_add_lock(res, &res->lr_waiting, req);
356                         *flags |= LDLM_FL_BLOCK_GRANTED;
357                         return LDLM_ITER_STOP;
358                 }
359                 if (reprocess_failed)
360                         return LDLM_ITER_CONTINUE;
361         }
362
363         if (*flags & LDLM_FL_TEST_LOCK) {
364                 ldlm_flock_destroy(req, mode, *flags);
365                 req->l_req_mode = LCK_NL;
366                 *flags |= LDLM_FL_LOCK_CHANGED;
367                 return LDLM_ITER_STOP;
368         }
369
370         /* In case we had slept on this lock request take it off of the
371          * deadlock detection hash list. */
372         ldlm_flock_blocking_unlink(req);
373
374         /* Scan the locks owned by this process that overlap this request.
375          * We may have to merge or split existing locks. */
376
377         if (!ownlocks)
378                 ownlocks = &res->lr_granted;
379
380         list_for_remaining_safe(ownlocks, tmp, &res->lr_granted) {
381                 lock = list_entry(ownlocks, struct ldlm_lock, l_res_link);
382
383                 if (!ldlm_same_flock_owner(lock, new))
384                         break;
385
386                 if (lock->l_granted_mode == mode) {
387                         /* If the modes are the same then we need to process
388                          * locks that overlap OR adjoin the new lock. The extra
389                          * logic condition is necessary to deal with arithmetic
390                          * overflow and underflow. */
391                         if ((new->l_policy_data.l_flock.start >
392                              (lock->l_policy_data.l_flock.end + 1))
393                             && (lock->l_policy_data.l_flock.end !=
394                                 OBD_OBJECT_EOF))
395                                 continue;
396
397                         if ((new->l_policy_data.l_flock.end <
398                              (lock->l_policy_data.l_flock.start - 1))
399                             && (lock->l_policy_data.l_flock.start != 0))
400                                 break;
401
402                         if (new->l_policy_data.l_flock.start <
403                             lock->l_policy_data.l_flock.start) {
404                                 lock->l_policy_data.l_flock.start =
405                                         new->l_policy_data.l_flock.start;
406                         } else {
407                                 new->l_policy_data.l_flock.start =
408                                         lock->l_policy_data.l_flock.start;
409                         }
410
411                         if (new->l_policy_data.l_flock.end >
412                             lock->l_policy_data.l_flock.end) {
413                                 lock->l_policy_data.l_flock.end =
414                                         new->l_policy_data.l_flock.end;
415                         } else {
416                                 new->l_policy_data.l_flock.end =
417                                         lock->l_policy_data.l_flock.end;
418                         }
419
420                         if (added) {
421                                 ldlm_flock_destroy(lock, mode, *flags);
422                         } else {
423                                 new = lock;
424                                 added = 1;
425                         }
426                         continue;
427                 }
428
429                 if (new->l_policy_data.l_flock.start >
430                     lock->l_policy_data.l_flock.end)
431                         continue;
432
433                 if (new->l_policy_data.l_flock.end <
434                     lock->l_policy_data.l_flock.start)
435                         break;
436
437                 ++overlaps;
438
439                 if (new->l_policy_data.l_flock.start <=
440                     lock->l_policy_data.l_flock.start) {
441                         if (new->l_policy_data.l_flock.end <
442                             lock->l_policy_data.l_flock.end) {
443                                 lock->l_policy_data.l_flock.start =
444                                         new->l_policy_data.l_flock.end + 1;
445                                 break;
446                         }
447                         ldlm_flock_destroy(lock, lock->l_req_mode, *flags);
448                         continue;
449                 }
450                 if (new->l_policy_data.l_flock.end >=
451                     lock->l_policy_data.l_flock.end) {
452                         lock->l_policy_data.l_flock.end =
453                                 new->l_policy_data.l_flock.start - 1;
454                         continue;
455                 }
456
457                 /* split the existing lock into two locks */
458
459                 /* if this is an F_UNLCK operation then we could avoid
460                  * allocating a new lock and use the req lock passed in
461                  * with the request but this would complicate the reply
462                  * processing since updates to req get reflected in the
463                  * reply. The client side replays the lock request so
464                  * it must see the original lock data in the reply. */
465
466                 /* XXX - if ldlm_lock_new() can sleep we should
467                  * release the lr_lock, allocate the new lock,
468                  * and restart processing this lock. */
469                 if (!new2) {
470                         unlock_res_and_lock(req);
471                         new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
472                                                 lock->l_granted_mode, &null_cbs,
473                                                 NULL, 0, LVB_T_NONE);
474                         lock_res_and_lock(req);
475                         if (!new2) {
476                                 ldlm_flock_destroy(req, lock->l_granted_mode,
477                                                    *flags);
478                                 *err = -ENOLCK;
479                                 return LDLM_ITER_STOP;
480                         }
481                         goto reprocess;
482                 }
483
484                 splitted = 1;
485
486                 new2->l_granted_mode = lock->l_granted_mode;
487                 new2->l_policy_data.l_flock.pid =
488                         new->l_policy_data.l_flock.pid;
489                 new2->l_policy_data.l_flock.owner =
490                         new->l_policy_data.l_flock.owner;
491                 new2->l_policy_data.l_flock.start =
492                         lock->l_policy_data.l_flock.start;
493                 new2->l_policy_data.l_flock.end =
494                         new->l_policy_data.l_flock.start - 1;
495                 lock->l_policy_data.l_flock.start =
496                         new->l_policy_data.l_flock.end + 1;
497                 new2->l_conn_export = lock->l_conn_export;
498                 if (lock->l_export != NULL) {
499                         new2->l_export = class_export_lock_get(lock->l_export, new2);
500                         if (new2->l_export->exp_lock_hash &&
501                             hlist_unhashed(&new2->l_exp_hash))
502                                 cfs_hash_add(new2->l_export->exp_lock_hash,
503                                              &new2->l_remote_handle,
504                                              &new2->l_exp_hash);
505                 }
506                 if (*flags == LDLM_FL_WAIT_NOREPROC)
507                         ldlm_lock_addref_internal_nolock(new2,
508                                                          lock->l_granted_mode);
509
510                 /* insert new2 at lock */
511                 ldlm_resource_add_lock(res, ownlocks, new2);
512                 LDLM_LOCK_RELEASE(new2);
513                 break;
514         }
515
516         /* if new2 is created but never used, destroy it*/
517         if (splitted == 0 && new2 != NULL)
518                 ldlm_lock_destroy_nolock(new2);
519
520         /* At this point we're granting the lock request. */
521         req->l_granted_mode = req->l_req_mode;
522
523         /* Add req to the granted queue before calling ldlm_reprocess_all(). */
524         if (!added) {
525                 list_del_init(&req->l_res_link);
526                 /* insert new lock before ownlocks in list. */
527                 ldlm_resource_add_lock(res, ownlocks, req);
528         }
529
530         if (*flags != LDLM_FL_WAIT_NOREPROC) {
531                 /* The only one possible case for client-side calls flock
532                  * policy function is ldlm_flock_completion_ast inside which
533                  * carries LDLM_FL_WAIT_NOREPROC flag. */
534                 CERROR("Illegal parameter for client-side-only module.\n");
535                 LBUG();
536         }
537
538         /* In case we're reprocessing the requested lock we can't destroy
539          * it until after calling ldlm_add_ast_work_item() above so that laawi()
540          * can bump the reference count on \a req. Otherwise \a req
541          * could be freed before the completion AST can be sent.  */
542         if (added)
543                 ldlm_flock_destroy(req, mode, *flags);
544
545         ldlm_resource_dump(D_INFO, res);
546         return LDLM_ITER_CONTINUE;
547 }
548
549 struct ldlm_flock_wait_data {
550         struct ldlm_lock *fwd_lock;
551         int            fwd_generation;
552 };
553
554 static void
555 ldlm_flock_interrupted_wait(void *data)
556 {
557         struct ldlm_lock *lock;
558
559         lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;
560
561         /* take lock off the deadlock detection hash list. */
562         lock_res_and_lock(lock);
563         ldlm_flock_blocking_unlink(lock);
564
565         /* client side - set flag to prevent lock from being put on LRU list */
566         lock->l_flags |= LDLM_FL_CBPENDING;
567         unlock_res_and_lock(lock);
568 }
569
570 /**
571  * Flock completion callback function.
572  *
573  * \param lock [in,out]: A lock to be handled
574  * \param flags    [in]: flags
575  * \param *data    [in]: ldlm_work_cp_ast_lock() will use ldlm_cb_set_arg
576  *
577  * \retval 0    : success
578  * \retval <0   : failure
579  */
580 int
581 ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
582 {
583         struct file_lock                *getlk = lock->l_ast_data;
584         struct obd_device             *obd;
585         struct obd_import             *imp = NULL;
586         struct ldlm_flock_wait_data     fwd;
587         struct l_wait_info            lwi;
588         ldlm_error_t                err;
589         int                          rc = 0;
590
591         CDEBUG(D_DLMTRACE, "flags: 0x%llx data: %p getlk: %p\n",
592                flags, data, getlk);
593
594         /* Import invalidation. We need to actually release the lock
595          * references being held, so that it can go away. No point in
596          * holding the lock even if app still believes it has it, since
597          * server already dropped it anyway. Only for granted locks too. */
598         if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
599             (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
600                 if (lock->l_req_mode == lock->l_granted_mode &&
601                     lock->l_granted_mode != LCK_NL &&
602                     NULL == data)
603                         ldlm_lock_decref_internal(lock, lock->l_req_mode);
604
605                 /* Need to wake up the waiter if we were evicted */
606                 wake_up(&lock->l_waitq);
607                 return 0;
608         }
609
610         LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
611
612         if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
613                        LDLM_FL_BLOCK_CONV))) {
614                 if (NULL == data)
615                         /* mds granted the lock in the reply */
616                         goto granted;
617                 /* CP AST RPC: lock get granted, wake it up */
618                 wake_up(&lock->l_waitq);
619                 return 0;
620         }
621
622         LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "
623                    "sleeping");
624         fwd.fwd_lock = lock;
625         obd = class_exp2obd(lock->l_conn_export);
626
627         /* if this is a local lock, there is no import */
628         if (NULL != obd)
629                 imp = obd->u.cli.cl_import;
630
631         if (NULL != imp) {
632                 spin_lock(&imp->imp_lock);
633                 fwd.fwd_generation = imp->imp_generation;
634                 spin_unlock(&imp->imp_lock);
635         }
636
637         lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);
638
639         /* Go to sleep until the lock is granted. */
640         rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
641
642         if (rc) {
643                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
644                            rc);
645                 return rc;
646         }
647
648 granted:
649         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
650
651         if (lock->l_flags & LDLM_FL_DESTROYED) {
652                 LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
653                 return 0;
654         }
655
656         if (lock->l_flags & LDLM_FL_FAILED) {
657                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
658                 return -EIO;
659         }
660
661         if (rc) {
662                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
663                            rc);
664                 return rc;
665         }
666
667         LDLM_DEBUG(lock, "client-side enqueue granted");
668
669         lock_res_and_lock(lock);
670
671         /* take lock off the deadlock detection hash list. */
672         ldlm_flock_blocking_unlink(lock);
673
674         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
675         list_del_init(&lock->l_res_link);
676
677         if (lock->l_flags & LDLM_FL_FLOCK_DEADLOCK) {
678                 LDLM_DEBUG(lock, "client-side enqueue deadlock received");
679                 rc = -EDEADLK;
680         } else if (flags & LDLM_FL_TEST_LOCK) {
681                 /* fcntl(F_GETLK) request */
682                 /* The old mode was saved in getlk->fl_type so that if the mode
683                  * in the lock changes we can decref the appropriate refcount.*/
684                 ldlm_flock_destroy(lock, getlk->fl_type, LDLM_FL_WAIT_NOREPROC);
685                 switch (lock->l_granted_mode) {
686                 case LCK_PR:
687                         getlk->fl_type = F_RDLCK;
688                         break;
689                 case LCK_PW:
690                         getlk->fl_type = F_WRLCK;
691                         break;
692                 default:
693                         getlk->fl_type = F_UNLCK;
694                 }
695                 getlk->fl_pid = (pid_t)lock->l_policy_data.l_flock.pid;
696                 getlk->fl_start = (loff_t)lock->l_policy_data.l_flock.start;
697                 getlk->fl_end = (loff_t)lock->l_policy_data.l_flock.end;
698         } else {
699                 __u64 noreproc = LDLM_FL_WAIT_NOREPROC;
700
701                 /* We need to reprocess the lock to do merges or splits
702                  * with existing locks owned by this process. */
703                 ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);
704         }
705         unlock_res_and_lock(lock);
706         return rc;
707 }
708 EXPORT_SYMBOL(ldlm_flock_completion_ast);
709
710 int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
711                             void *data, int flag)
712 {
713         LASSERT(lock);
714         LASSERT(flag == LDLM_CB_CANCELING);
715
716         /* take lock off the deadlock detection hash list. */
717         lock_res_and_lock(lock);
718         ldlm_flock_blocking_unlink(lock);
719         unlock_res_and_lock(lock);
720         return 0;
721 }
722
723 void ldlm_flock_policy_wire18_to_local(const ldlm_wire_policy_data_t *wpolicy,
724                                        ldlm_policy_data_t *lpolicy)
725 {
726         memset(lpolicy, 0, sizeof(*lpolicy));
727         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
728         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
729         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
730         /* Compat code, old clients had no idea about owner field and
731          * relied solely on pid for ownership. Introduced in LU-104, 2.1,
732          * April 2011 */
733         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_pid;
734 }
735
736
737 void ldlm_flock_policy_wire21_to_local(const ldlm_wire_policy_data_t *wpolicy,
738                                        ldlm_policy_data_t *lpolicy)
739 {
740         memset(lpolicy, 0, sizeof(*lpolicy));
741         lpolicy->l_flock.start = wpolicy->l_flock.lfw_start;
742         lpolicy->l_flock.end = wpolicy->l_flock.lfw_end;
743         lpolicy->l_flock.pid = wpolicy->l_flock.lfw_pid;
744         lpolicy->l_flock.owner = wpolicy->l_flock.lfw_owner;
745 }
746
747 void ldlm_flock_policy_local_to_wire(const ldlm_policy_data_t *lpolicy,
748                                      ldlm_wire_policy_data_t *wpolicy)
749 {
750         memset(wpolicy, 0, sizeof(*wpolicy));
751         wpolicy->l_flock.lfw_start = lpolicy->l_flock.start;
752         wpolicy->l_flock.lfw_end = lpolicy->l_flock.end;
753         wpolicy->l_flock.lfw_pid = lpolicy->l_flock.pid;
754         wpolicy->l_flock.lfw_owner = lpolicy->l_flock.owner;
755 }
756
757 /*
758  * Export handle<->flock hash operations.
759  */
760 static unsigned
761 ldlm_export_flock_hash(struct cfs_hash *hs, const void *key, unsigned mask)
762 {
763         return cfs_hash_u64_hash(*(__u64 *)key, mask);
764 }
765
766 static void *
767 ldlm_export_flock_key(struct hlist_node *hnode)
768 {
769         struct ldlm_lock *lock;
770
771         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
772         return &lock->l_policy_data.l_flock.owner;
773 }
774
775 static int
776 ldlm_export_flock_keycmp(const void *key, struct hlist_node *hnode)
777 {
778         return !memcmp(ldlm_export_flock_key(hnode), key, sizeof(__u64));
779 }
780
781 static void *
782 ldlm_export_flock_object(struct hlist_node *hnode)
783 {
784         return hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
785 }
786
787 static void
788 ldlm_export_flock_get(struct cfs_hash *hs, struct hlist_node *hnode)
789 {
790         struct ldlm_lock *lock;
791         struct ldlm_flock *flock;
792
793         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
794         LDLM_LOCK_GET(lock);
795
796         flock = &lock->l_policy_data.l_flock;
797         LASSERT(flock->blocking_export != NULL);
798         class_export_get(flock->blocking_export);
799         flock->blocking_refs++;
800 }
801
802 static void
803 ldlm_export_flock_put(struct cfs_hash *hs, struct hlist_node *hnode)
804 {
805         struct ldlm_lock *lock;
806         struct ldlm_flock *flock;
807
808         lock = hlist_entry(hnode, struct ldlm_lock, l_exp_flock_hash);
809         LDLM_LOCK_RELEASE(lock);
810
811         flock = &lock->l_policy_data.l_flock;
812         LASSERT(flock->blocking_export != NULL);
813         class_export_put(flock->blocking_export);
814         if (--flock->blocking_refs == 0) {
815                 flock->blocking_owner = 0;
816                 flock->blocking_export = NULL;
817         }
818 }
819
820 static cfs_hash_ops_t ldlm_export_flock_ops = {
821         .hs_hash        = ldlm_export_flock_hash,
822         .hs_key  = ldlm_export_flock_key,
823         .hs_keycmp      = ldlm_export_flock_keycmp,
824         .hs_object      = ldlm_export_flock_object,
825         .hs_get  = ldlm_export_flock_get,
826         .hs_put  = ldlm_export_flock_put,
827         .hs_put_locked  = ldlm_export_flock_put,
828 };
829
830 int ldlm_init_flock_export(struct obd_export *exp)
831 {
832         if (strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDT_NAME) != 0)
833                 return 0;
834
835         exp->exp_flock_hash =
836                 cfs_hash_create(obd_uuid2str(&exp->exp_client_uuid),
837                                 HASH_EXP_LOCK_CUR_BITS,
838                                 HASH_EXP_LOCK_MAX_BITS,
839                                 HASH_EXP_LOCK_BKT_BITS, 0,
840                                 CFS_HASH_MIN_THETA, CFS_HASH_MAX_THETA,
841                                 &ldlm_export_flock_ops,
842                                 CFS_HASH_DEFAULT | CFS_HASH_NBLK_CHANGE);
843         if (!exp->exp_flock_hash)
844                 return -ENOMEM;
845
846         return 0;
847 }
848 EXPORT_SYMBOL(ldlm_init_flock_export);
849
850 void ldlm_destroy_flock_export(struct obd_export *exp)
851 {
852         if (exp->exp_flock_hash) {
853                 cfs_hash_putref(exp->exp_flock_hash);
854                 exp->exp_flock_hash = NULL;
855         }
856 }
857 EXPORT_SYMBOL(ldlm_destroy_flock_export);