4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
37 #define DEBUG_SUBSYSTEM S_OSC
39 #include "../../include/linux/libcfs/libcfs.h"
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
47 #include "../include/lustre_ha.h"
48 #include "../include/lprocfs_status.h"
49 #include "../include/lustre_debug.h"
50 #include "../include/lustre_param.h"
51 #include "../include/lustre_fid.h"
52 #include "../include/obd_class.h"
53 #include "osc_internal.h"
54 #include "osc_cl_internal.h"
56 struct osc_brw_async_args {
62 struct brw_page **aa_ppga;
63 struct client_obd *aa_cli;
64 struct list_head aa_oaps;
65 struct list_head aa_exts;
66 struct obd_capa *aa_ocapa;
67 struct cl_req *aa_clerq;
70 struct osc_async_args {
71 struct obd_info *aa_oi;
74 struct osc_setattr_args {
76 obd_enqueue_update_f sa_upcall;
80 struct osc_fsync_args {
81 struct obd_info *fa_oi;
82 obd_enqueue_update_f fa_upcall;
86 struct osc_enqueue_args {
87 struct obd_export *oa_exp;
89 obd_enqueue_update_f oa_upcall;
91 struct ost_lvb *oa_lvb;
92 struct lustre_handle *oa_lockh;
93 struct ldlm_enqueue_info *oa_ei;
94 unsigned int oa_agl:1;
97 static void osc_release_ppga(struct brw_page **ppga, u32 count);
98 static int brw_interpret(const struct lu_env *env,
99 struct ptlrpc_request *req, void *data, int rc);
100 int osc_cleanup(struct obd_device *obd);
102 /* Pack OSC object metadata for disk storage (LE byte order). */
103 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
104 struct lov_stripe_md *lsm)
108 lmm_size = sizeof(**lmmp);
112 if (*lmmp != NULL && lsm == NULL) {
113 OBD_FREE(*lmmp, lmm_size);
116 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
121 OBD_ALLOC(*lmmp, lmm_size);
127 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
132 /* Unpack OSC object metadata from disk storage (LE byte order). */
133 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
134 struct lov_mds_md *lmm, int lmm_bytes)
137 struct obd_import *imp = class_exp2cliimp(exp);
140 if (lmm_bytes < sizeof(*lmm)) {
141 CERROR("%s: lov_mds_md too small: %d, need %d\n",
142 exp->exp_obd->obd_name, lmm_bytes,
146 /* XXX LOV_MAGIC etc check? */
148 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
149 CERROR("%s: zero lmm_object_id: rc = %d\n",
150 exp->exp_obd->obd_name, -EINVAL);
155 lsm_size = lov_stripe_md_size(1);
159 if (*lsmp != NULL && lmm == NULL) {
160 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
161 OBD_FREE(*lsmp, lsm_size);
167 OBD_ALLOC(*lsmp, lsm_size);
168 if (unlikely(*lsmp == NULL))
170 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
171 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
172 OBD_FREE(*lsmp, lsm_size);
175 loi_init((*lsmp)->lsm_oinfo[0]);
176 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
181 /* XXX zero *lsmp? */
182 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
185 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
186 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
188 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
193 static inline void osc_pack_capa(struct ptlrpc_request *req,
194 struct ost_body *body, void *capa)
196 struct obd_capa *oc = (struct obd_capa *)capa;
197 struct lustre_capa *c;
202 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
205 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
206 DEBUG_CAPA(D_SEC, c, "pack");
209 static inline void osc_pack_req_body(struct ptlrpc_request *req,
210 struct obd_info *oinfo)
212 struct ost_body *body;
214 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
217 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
219 osc_pack_capa(req, body, oinfo->oi_capa);
222 static inline void osc_set_capa_size(struct ptlrpc_request *req,
223 const struct req_msg_field *field,
227 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
229 /* it is already calculated as sizeof struct obd_capa */
233 static int osc_getattr_interpret(const struct lu_env *env,
234 struct ptlrpc_request *req,
235 struct osc_async_args *aa, int rc)
237 struct ost_body *body;
242 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
244 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
245 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
246 aa->aa_oi->oi_oa, &body->oa);
248 /* This should really be sent by the OST */
249 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
250 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
252 CDEBUG(D_INFO, "can't unpack ost_body\n");
254 aa->aa_oi->oi_oa->o_valid = 0;
257 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
261 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
262 struct ptlrpc_request_set *set)
264 struct ptlrpc_request *req;
265 struct osc_async_args *aa;
268 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
272 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
273 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
275 ptlrpc_request_free(req);
279 osc_pack_req_body(req, oinfo);
281 ptlrpc_request_set_replen(req);
282 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
284 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
285 aa = ptlrpc_req_async_args(req);
288 ptlrpc_set_add_req(set, req);
292 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
293 struct obd_info *oinfo)
295 struct ptlrpc_request *req;
296 struct ost_body *body;
299 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
303 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
304 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
306 ptlrpc_request_free(req);
310 osc_pack_req_body(req, oinfo);
312 ptlrpc_request_set_replen(req);
314 rc = ptlrpc_queue_wait(req);
318 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
324 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
325 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
328 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
329 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
332 ptlrpc_req_finished(req);
336 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
337 struct obd_info *oinfo, struct obd_trans_info *oti)
339 struct ptlrpc_request *req;
340 struct ost_body *body;
343 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
345 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
349 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
350 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
352 ptlrpc_request_free(req);
356 osc_pack_req_body(req, oinfo);
358 ptlrpc_request_set_replen(req);
360 rc = ptlrpc_queue_wait(req);
364 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
370 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
374 ptlrpc_req_finished(req);
378 static int osc_setattr_interpret(const struct lu_env *env,
379 struct ptlrpc_request *req,
380 struct osc_setattr_args *sa, int rc)
382 struct ost_body *body;
387 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
393 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
396 rc = sa->sa_upcall(sa->sa_cookie, rc);
400 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
401 struct obd_trans_info *oti,
402 obd_enqueue_update_f upcall, void *cookie,
403 struct ptlrpc_request_set *rqset)
405 struct ptlrpc_request *req;
406 struct osc_setattr_args *sa;
409 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
413 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
414 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
416 ptlrpc_request_free(req);
420 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
421 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
423 osc_pack_req_body(req, oinfo);
425 ptlrpc_request_set_replen(req);
427 /* do mds to ost setattr asynchronously */
429 /* Do not wait for response. */
430 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
432 req->rq_interpret_reply =
433 (ptlrpc_interpterer_t)osc_setattr_interpret;
435 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
436 sa = ptlrpc_req_async_args(req);
437 sa->sa_oa = oinfo->oi_oa;
438 sa->sa_upcall = upcall;
439 sa->sa_cookie = cookie;
441 if (rqset == PTLRPCD_SET)
442 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
444 ptlrpc_set_add_req(rqset, req);
450 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
451 struct obd_trans_info *oti,
452 struct ptlrpc_request_set *rqset)
454 return osc_setattr_async_base(exp, oinfo, oti,
455 oinfo->oi_cb_up, oinfo, rqset);
458 int osc_real_create(struct obd_export *exp, struct obdo *oa,
459 struct lov_stripe_md **ea, struct obd_trans_info *oti)
461 struct ptlrpc_request *req;
462 struct ost_body *body;
463 struct lov_stripe_md *lsm;
471 rc = obd_alloc_memmd(exp, &lsm);
476 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
482 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
484 ptlrpc_request_free(req);
488 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
491 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
493 ptlrpc_request_set_replen(req);
495 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
496 oa->o_flags == OBD_FL_DELORPHAN) {
498 "delorphan from OST integration");
499 /* Don't resend the delorphan req */
500 req->rq_no_resend = req->rq_no_delay = 1;
503 rc = ptlrpc_queue_wait(req);
507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
513 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
514 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
516 oa->o_blksize = cli_brw_size(exp->exp_obd);
517 oa->o_valid |= OBD_MD_FLBLKSZ;
519 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
520 * have valid lsm_oinfo data structs, so don't go touching that.
521 * This needs to be fixed in a big way.
523 lsm->lsm_oi = oa->o_oi;
527 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
529 if (oa->o_valid & OBD_MD_FLCOOKIE) {
530 if (!oti->oti_logcookies)
531 oti_alloc_cookies(oti, 1);
532 *oti->oti_logcookies = oa->o_lcookie;
536 CDEBUG(D_HA, "transno: %lld\n",
537 lustre_msg_get_transno(req->rq_repmsg));
539 ptlrpc_req_finished(req);
542 obd_free_memmd(exp, &lsm);
546 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
547 obd_enqueue_update_f upcall, void *cookie,
548 struct ptlrpc_request_set *rqset)
550 struct ptlrpc_request *req;
551 struct osc_setattr_args *sa;
552 struct ost_body *body;
555 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
559 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
560 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
562 ptlrpc_request_free(req);
565 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
566 ptlrpc_at_set_req_timeout(req);
568 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
570 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
572 osc_pack_capa(req, body, oinfo->oi_capa);
574 ptlrpc_request_set_replen(req);
576 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
577 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
578 sa = ptlrpc_req_async_args(req);
579 sa->sa_oa = oinfo->oi_oa;
580 sa->sa_upcall = upcall;
581 sa->sa_cookie = cookie;
582 if (rqset == PTLRPCD_SET)
583 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
585 ptlrpc_set_add_req(rqset, req);
590 static int osc_sync_interpret(const struct lu_env *env,
591 struct ptlrpc_request *req,
594 struct osc_fsync_args *fa = arg;
595 struct ost_body *body;
600 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
602 CERROR ("can't unpack ost_body\n");
607 *fa->fa_oi->oi_oa = body->oa;
609 rc = fa->fa_upcall(fa->fa_cookie, rc);
613 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
614 obd_enqueue_update_f upcall, void *cookie,
615 struct ptlrpc_request_set *rqset)
617 struct ptlrpc_request *req;
618 struct ost_body *body;
619 struct osc_fsync_args *fa;
622 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
626 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
627 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
629 ptlrpc_request_free(req);
633 /* overload the size and blocks fields in the oa with start/end */
634 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
636 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
638 osc_pack_capa(req, body, oinfo->oi_capa);
640 ptlrpc_request_set_replen(req);
641 req->rq_interpret_reply = osc_sync_interpret;
643 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
644 fa = ptlrpc_req_async_args(req);
646 fa->fa_upcall = upcall;
647 fa->fa_cookie = cookie;
649 if (rqset == PTLRPCD_SET)
650 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
652 ptlrpc_set_add_req(rqset, req);
657 /* Find and cancel locally locks matched by @mode in the resource found by
658 * @objid. Found locks are added into @cancel list. Returns the amount of
659 * locks added to @cancels list. */
660 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
661 struct list_head *cancels,
662 ldlm_mode_t mode, __u64 lock_flags)
664 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
665 struct ldlm_res_id res_id;
666 struct ldlm_resource *res;
669 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
670 * export) but disabled through procfs (flag in NS).
672 * This distinguishes from a case when ELC is not supported originally,
673 * when we still want to cancel locks in advance and just cancel them
674 * locally, without sending any RPC. */
675 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
678 ostid_build_res_name(&oa->o_oi, &res_id);
679 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
683 LDLM_RESOURCE_ADDREF(res);
684 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
685 lock_flags, 0, NULL);
686 LDLM_RESOURCE_DELREF(res);
687 ldlm_resource_putref(res);
691 static int osc_destroy_interpret(const struct lu_env *env,
692 struct ptlrpc_request *req, void *data,
695 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
697 atomic_dec(&cli->cl_destroy_in_flight);
698 wake_up(&cli->cl_destroy_waitq);
702 static int osc_can_send_destroy(struct client_obd *cli)
704 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
705 cli->cl_max_rpcs_in_flight) {
706 /* The destroy request can be sent */
709 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
710 cli->cl_max_rpcs_in_flight) {
712 * The counter has been modified between the two atomic
715 wake_up(&cli->cl_destroy_waitq);
720 int osc_create(const struct lu_env *env, struct obd_export *exp,
721 struct obdo *oa, struct lov_stripe_md **ea,
722 struct obd_trans_info *oti)
728 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
730 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
731 oa->o_flags == OBD_FL_RECREATE_OBJS) {
732 return osc_real_create(exp, oa, ea, oti);
735 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
736 return osc_real_create(exp, oa, ea, oti);
738 /* we should not get here anymore */
744 /* Destroy requests can be async always on the client, and we don't even really
745 * care about the return code since the client cannot do anything at all about
747 * When the MDS is unlinking a filename, it saves the file objects into a
748 * recovery llog, and these object records are cancelled when the OST reports
749 * they were destroyed and sync'd to disk (i.e. transaction committed).
750 * If the client dies, or the OST is down when the object should be destroyed,
751 * the records are not cancelled, and when the OST reconnects to the MDS next,
752 * it will retrieve the llog unlink logs and then sends the log cancellation
753 * cookies to the MDS after committing destroy transactions. */
754 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
755 struct obdo *oa, struct lov_stripe_md *ea,
756 struct obd_trans_info *oti, struct obd_export *md_export,
759 struct client_obd *cli = &exp->exp_obd->u.cli;
760 struct ptlrpc_request *req;
761 struct ost_body *body;
766 CDEBUG(D_INFO, "oa NULL\n");
770 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771 LDLM_FL_DISCARD_DATA);
773 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
775 ldlm_lock_list_put(&cancels, l_bl_ast, count);
779 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
783 ptlrpc_request_free(req);
787 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788 ptlrpc_at_set_req_timeout(req);
790 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791 oa->o_lcookie = *oti->oti_logcookies;
792 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
794 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
796 osc_pack_capa(req, body, (struct obd_capa *)capa);
797 ptlrpc_request_set_replen(req);
799 /* If osc_destroy is for destroying the unlink orphan,
800 * sent from MDT to OST, which should not be blocked here,
801 * because the process might be triggered by ptlrpcd, and
802 * it is not good to block ptlrpcd thread (b=16006)*/
803 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804 req->rq_interpret_reply = osc_destroy_interpret;
805 if (!osc_can_send_destroy(cli)) {
806 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
810 * Wait until the number of on-going destroy RPCs drops
811 * under max_rpc_in_flight
813 l_wait_event_exclusive(cli->cl_destroy_waitq,
814 osc_can_send_destroy(cli), &lwi);
818 /* Do not wait for response */
819 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
823 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
826 u32 bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
828 LASSERT(!(oa->o_valid & bits));
831 client_obd_list_lock(&cli->cl_loi_list_lock);
832 oa->o_dirty = cli->cl_dirty;
833 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834 cli->cl_dirty_max)) {
835 CERROR("dirty %lu - %lu > dirty_max %lu\n",
836 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
838 } else if (unlikely(atomic_read(&obd_dirty_pages) -
839 atomic_read(&obd_dirty_transit_pages) >
840 (long)(obd_max_dirty_pages + 1))) {
841 /* The atomic_read() allowing the atomic_inc() are
842 * not covered by a lock thus they may safely race and trip
843 * this CERROR() unless we add in a small fudge factor (+1). */
844 CERROR("dirty %d - %d > system dirty_max %d\n",
845 atomic_read(&obd_dirty_pages),
846 atomic_read(&obd_dirty_transit_pages),
847 obd_max_dirty_pages);
849 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850 CERROR("dirty %lu - dirty_max %lu too big???\n",
851 cli->cl_dirty, cli->cl_dirty_max);
854 long max_in_flight = (cli->cl_max_pages_per_rpc <<
856 (cli->cl_max_rpcs_in_flight + 1);
857 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
859 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860 oa->o_dropped = cli->cl_lost_grant;
861 cli->cl_lost_grant = 0;
862 client_obd_list_unlock(&cli->cl_loi_list_lock);
863 CDEBUG(D_CACHE, "dirty: %llu undirty: %u dropped %u grant: %llu\n",
864 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
868 void osc_update_next_shrink(struct client_obd *cli)
870 cli->cl_next_shrink_grant =
871 cfs_time_shift(cli->cl_grant_shrink_interval);
872 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873 cli->cl_next_shrink_grant);
876 static void __osc_update_grant(struct client_obd *cli, u64 grant)
878 client_obd_list_lock(&cli->cl_loi_list_lock);
879 cli->cl_avail_grant += grant;
880 client_obd_list_unlock(&cli->cl_loi_list_lock);
883 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
885 if (body->oa.o_valid & OBD_MD_FLGRANT) {
886 CDEBUG(D_CACHE, "got %llu extra grant\n", body->oa.o_grant);
887 __osc_update_grant(cli, body->oa.o_grant);
891 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892 u32 keylen, void *key, u32 vallen,
893 void *val, struct ptlrpc_request_set *set);
895 static int osc_shrink_grant_interpret(const struct lu_env *env,
896 struct ptlrpc_request *req,
899 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900 struct obdo *oa = ((struct osc_brw_async_args *)aa)->aa_oa;
901 struct ost_body *body;
904 __osc_update_grant(cli, oa->o_grant);
908 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
910 osc_update_grant(cli, body);
916 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
918 client_obd_list_lock(&cli->cl_loi_list_lock);
919 oa->o_grant = cli->cl_avail_grant / 4;
920 cli->cl_avail_grant -= oa->o_grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
922 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923 oa->o_valid |= OBD_MD_FLFLAGS;
926 oa->o_flags |= OBD_FL_SHRINK_GRANT;
927 osc_update_next_shrink(cli);
930 /* Shrink the current grant, either from some large amount to enough for a
931 * full set of in-flight RPCs, or if we have already shrunk to that limit
932 * then to enough for a single RPC. This avoids keeping more grant than
933 * needed, and avoids shrinking the grant piecemeal. */
934 static int osc_shrink_grant(struct client_obd *cli)
936 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
939 client_obd_list_lock(&cli->cl_loi_list_lock);
940 if (cli->cl_avail_grant <= target_bytes)
941 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
944 return osc_shrink_grant_to_target(cli, target_bytes);
947 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
950 struct ost_body *body;
952 client_obd_list_lock(&cli->cl_loi_list_lock);
953 /* Don't shrink if we are already above or below the desired limit
954 * We don't want to shrink below a single RPC, as that will negatively
955 * impact block allocation and long-term performance. */
956 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
957 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
959 if (target_bytes >= cli->cl_avail_grant) {
960 client_obd_list_unlock(&cli->cl_loi_list_lock);
963 client_obd_list_unlock(&cli->cl_loi_list_lock);
969 osc_announce_cached(cli, &body->oa, 0);
971 client_obd_list_lock(&cli->cl_loi_list_lock);
972 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
973 cli->cl_avail_grant = target_bytes;
974 client_obd_list_unlock(&cli->cl_loi_list_lock);
975 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
976 body->oa.o_valid |= OBD_MD_FLFLAGS;
977 body->oa.o_flags = 0;
979 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
980 osc_update_next_shrink(cli);
982 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
983 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
984 sizeof(*body), body, NULL);
986 __osc_update_grant(cli, body->oa.o_grant);
991 static int osc_should_shrink_grant(struct client_obd *client)
993 unsigned long time = cfs_time_current();
994 unsigned long next_shrink = client->cl_next_shrink_grant;
996 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
997 OBD_CONNECT_GRANT_SHRINK) == 0)
1000 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1001 /* Get the current RPC size directly, instead of going via:
1002 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1003 * Keep comment here so that it can be found by searching. */
1004 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1006 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1007 client->cl_avail_grant > brw_size)
1010 osc_update_next_shrink(client);
1015 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1017 struct client_obd *client;
1019 list_for_each_entry(client, &item->ti_obd_list,
1020 cl_grant_shrink_list) {
1021 if (osc_should_shrink_grant(client))
1022 osc_shrink_grant(client);
1027 static int osc_add_shrink_grant(struct client_obd *client)
1031 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1033 osc_grant_shrink_grant_cb, NULL,
1034 &client->cl_grant_shrink_list);
1036 CERROR("add grant client %s error %d\n",
1037 client->cl_import->imp_obd->obd_name, rc);
1040 CDEBUG(D_CACHE, "add grant client %s \n",
1041 client->cl_import->imp_obd->obd_name);
1042 osc_update_next_shrink(client);
1046 static int osc_del_shrink_grant(struct client_obd *client)
1048 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1052 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1055 * ocd_grant is the total grant amount we're expect to hold: if we've
1056 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1057 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1059 * race is tolerable here: if we're evicted, but imp_state already
1060 * left EVICTED state, then cl_dirty must be 0 already.
1062 client_obd_list_lock(&cli->cl_loi_list_lock);
1063 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1064 cli->cl_avail_grant = ocd->ocd_grant;
1066 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1068 if (cli->cl_avail_grant < 0) {
1069 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1070 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1071 ocd->ocd_grant, cli->cl_dirty);
1072 /* workaround for servers which do not have the patch from
1074 cli->cl_avail_grant = ocd->ocd_grant;
1077 /* determine the appropriate chunk size used by osc_extent. */
1078 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1079 client_obd_list_unlock(&cli->cl_loi_list_lock);
1081 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1082 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1083 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1085 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1086 list_empty(&cli->cl_grant_shrink_list))
1087 osc_add_shrink_grant(cli);
1090 /* We assume that the reason this OSC got a short read is because it read
1091 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1092 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1093 * this stripe never got written at or beyond this stripe offset yet. */
1094 static void handle_short_read(int nob_read, u32 page_count,
1095 struct brw_page **pga)
1100 /* skip bytes read OK */
1101 while (nob_read > 0) {
1102 LASSERT (page_count > 0);
1104 if (pga[i]->count > nob_read) {
1105 /* EOF inside this page */
1106 ptr = kmap(pga[i]->pg) +
1107 (pga[i]->off & ~CFS_PAGE_MASK);
1108 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1115 nob_read -= pga[i]->count;
1120 /* zero remaining pages */
1121 while (page_count-- > 0) {
1122 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1123 memset(ptr, 0, pga[i]->count);
1129 static int check_write_rcs(struct ptlrpc_request *req,
1130 int requested_nob, int niocount,
1131 u32 page_count, struct brw_page **pga)
1136 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1137 sizeof(*remote_rcs) *
1139 if (remote_rcs == NULL) {
1140 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1144 /* return error if any niobuf was in error */
1145 for (i = 0; i < niocount; i++) {
1146 if ((int)remote_rcs[i] < 0)
1147 return remote_rcs[i];
1149 if (remote_rcs[i] != 0) {
1150 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1151 i, remote_rcs[i], req);
1156 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1157 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1158 req->rq_bulk->bd_nob_transferred, requested_nob);
1165 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1167 if (p1->flag != p2->flag) {
1168 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1169 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1171 /* warn if we try to combine flags that we don't know to be
1172 * safe to combine */
1173 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1174 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1175 "report this at http://bugs.whamcloud.com/\n",
1176 p1->flag, p2->flag);
1181 return (p1->off + p1->count == p2->off);
1184 static u32 osc_checksum_bulk(int nob, u32 pg_count,
1185 struct brw_page **pga, int opc,
1186 cksum_type_t cksum_type)
1190 struct cfs_crypto_hash_desc *hdesc;
1191 unsigned int bufsize;
1193 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1195 LASSERT(pg_count > 0);
1197 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1198 if (IS_ERR(hdesc)) {
1199 CERROR("Unable to initialize checksum hash %s\n",
1200 cfs_crypto_hash_name(cfs_alg));
1201 return PTR_ERR(hdesc);
1204 while (nob > 0 && pg_count > 0) {
1205 int count = pga[i]->count > nob ? nob : pga[i]->count;
1207 /* corrupt the data before we compute the checksum, to
1208 * simulate an OST->client data error */
1209 if (i == 0 && opc == OST_READ &&
1210 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1211 unsigned char *ptr = kmap(pga[i]->pg);
1212 int off = pga[i]->off & ~CFS_PAGE_MASK;
1213 memcpy(ptr + off, "bad1", min(4, nob));
1216 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1217 pga[i]->off & ~CFS_PAGE_MASK,
1220 "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1221 pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1222 (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1223 page_private(pga[i]->pg),
1224 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1226 nob -= pga[i]->count;
1232 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1235 cfs_crypto_hash_final(hdesc, NULL, NULL);
1237 /* For sending we only compute the wrong checksum instead
1238 * of corrupting the data so it is still correct on a redo */
1239 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1245 static int osc_brw_prep_request(int cmd, struct client_obd *cli,
1247 struct lov_stripe_md *lsm, u32 page_count,
1248 struct brw_page **pga,
1249 struct ptlrpc_request **reqp,
1250 struct obd_capa *ocapa, int reserve,
1253 struct ptlrpc_request *req;
1254 struct ptlrpc_bulk_desc *desc;
1255 struct ost_body *body;
1256 struct obd_ioobj *ioobj;
1257 struct niobuf_remote *niobuf;
1258 int niocount, i, requested_nob, opc, rc;
1259 struct osc_brw_async_args *aa;
1260 struct req_capsule *pill;
1261 struct brw_page *pg_prev;
1263 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1264 return -ENOMEM; /* Recoverable */
1265 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1266 return -EINVAL; /* Fatal */
1268 if ((cmd & OBD_BRW_WRITE) != 0) {
1270 req = ptlrpc_request_alloc_pool(cli->cl_import,
1271 cli->cl_import->imp_rq_pool,
1272 &RQF_OST_BRW_WRITE);
1275 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1280 for (niocount = i = 1; i < page_count; i++) {
1281 if (!can_merge_pages(pga[i - 1], pga[i]))
1285 pill = &req->rq_pill;
1286 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1288 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1289 niocount * sizeof(*niobuf));
1290 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1292 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1294 ptlrpc_request_free(req);
1297 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1298 ptlrpc_at_set_req_timeout(req);
1299 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1301 req->rq_no_retry_einprogress = 1;
1303 desc = ptlrpc_prep_bulk_imp(req, page_count,
1304 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1305 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1312 /* NB request now owns desc and will free it when it gets freed */
1314 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1315 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1316 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1317 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1319 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1321 obdo_to_ioobj(oa, ioobj);
1322 ioobj->ioo_bufcnt = niocount;
1323 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1324 * that might be send for this request. The actual number is decided
1325 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1326 * "max - 1" for old client compatibility sending "0", and also so the
1327 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1328 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1329 osc_pack_capa(req, body, ocapa);
1330 LASSERT(page_count > 0);
1332 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1333 struct brw_page *pg = pga[i];
1334 int poff = pg->off & ~CFS_PAGE_MASK;
1336 LASSERT(pg->count > 0);
1337 /* make sure there is no gap in the middle of page array */
1338 LASSERTF(page_count == 1 ||
1339 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1340 ergo(i > 0 && i < page_count - 1,
1341 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1342 ergo(i == page_count - 1, poff == 0)),
1343 "i: %d/%d pg: %p off: %llu, count: %u\n",
1344 i, page_count, pg, pg->off, pg->count);
1345 LASSERTF(i == 0 || pg->off > pg_prev->off,
1346 "i %d p_c %u pg %p [pri %lu ind %lu] off %llu"
1347 " prev_pg %p [pri %lu ind %lu] off %llu\n",
1349 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1350 pg_prev->pg, page_private(pg_prev->pg),
1351 pg_prev->pg->index, pg_prev->off);
1352 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1353 (pg->flag & OBD_BRW_SRVLOCK));
1355 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1356 requested_nob += pg->count;
1358 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1360 niobuf->len += pg->count;
1362 niobuf->offset = pg->off;
1363 niobuf->len = pg->count;
1364 niobuf->flags = pg->flag;
1369 LASSERTF((void *)(niobuf - niocount) ==
1370 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1371 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1372 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1374 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1376 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1377 body->oa.o_valid |= OBD_MD_FLFLAGS;
1378 body->oa.o_flags = 0;
1380 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1383 if (osc_should_shrink_grant(cli))
1384 osc_shrink_grant_local(cli, &body->oa);
1386 /* size[REQ_REC_OFF] still sizeof (*body) */
1387 if (opc == OST_WRITE) {
1388 if (cli->cl_checksum &&
1389 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1390 /* store cl_cksum_type in a local variable since
1391 * it can be changed via lprocfs */
1392 cksum_type_t cksum_type = cli->cl_cksum_type;
1394 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1395 oa->o_flags &= OBD_FL_LOCAL_MASK;
1396 body->oa.o_flags = 0;
1398 body->oa.o_flags |= cksum_type_pack(cksum_type);
1399 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1400 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1404 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1406 /* save this in 'oa', too, for later checking */
1407 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1408 oa->o_flags |= cksum_type_pack(cksum_type);
1410 /* clear out the checksum flag, in case this is a
1411 * resend but cl_checksum is no longer set. b=11238 */
1412 oa->o_valid &= ~OBD_MD_FLCKSUM;
1414 oa->o_cksum = body->oa.o_cksum;
1415 /* 1 RC per niobuf */
1416 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1417 sizeof(__u32) * niocount);
1419 if (cli->cl_checksum &&
1420 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1421 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1422 body->oa.o_flags = 0;
1423 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1424 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1427 ptlrpc_request_set_replen(req);
1429 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1430 aa = ptlrpc_req_async_args(req);
1432 aa->aa_requested_nob = requested_nob;
1433 aa->aa_nio_count = niocount;
1434 aa->aa_page_count = page_count;
1438 INIT_LIST_HEAD(&aa->aa_oaps);
1439 if (ocapa && reserve)
1440 aa->aa_ocapa = capa_get(ocapa);
1446 ptlrpc_req_finished(req);
1450 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1451 __u32 client_cksum, __u32 server_cksum, int nob,
1452 u32 page_count, struct brw_page **pga,
1453 cksum_type_t client_cksum_type)
1457 cksum_type_t cksum_type;
1459 if (server_cksum == client_cksum) {
1460 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1464 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1466 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1469 if (cksum_type != client_cksum_type)
1470 msg = "the server did not use the checksum type specified in "
1471 "the original request - likely a protocol problem";
1472 else if (new_cksum == server_cksum)
1473 msg = "changed on the client after we checksummed it - "
1474 "likely false positive due to mmap IO (bug 11742)";
1475 else if (new_cksum == client_cksum)
1476 msg = "changed in transit before arrival at OST";
1478 msg = "changed in transit AND doesn't match the original - "
1479 "likely false positive due to mmap IO (bug 11742)";
1481 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1482 " object "DOSTID" extent [%llu-%llu]\n",
1483 msg, libcfs_nid2str(peer->nid),
1484 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1485 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1486 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1487 POSTID(&oa->o_oi), pga[0]->off,
1488 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1489 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1490 "client csum now %x\n", client_cksum, client_cksum_type,
1491 server_cksum, cksum_type, new_cksum);
1495 /* Note rc enters this function as number of bytes transferred */
1496 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1498 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1499 const lnet_process_id_t *peer =
1500 &req->rq_import->imp_connection->c_peer;
1501 struct client_obd *cli = aa->aa_cli;
1502 struct ost_body *body;
1503 __u32 client_cksum = 0;
1505 if (rc < 0 && rc != -EDQUOT) {
1506 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1510 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1511 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1513 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1517 /* set/clear over quota flag for a uid/gid */
1518 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1519 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1520 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1522 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid %#llx, flags %x\n",
1523 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1525 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1528 osc_update_grant(cli, body);
1533 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1534 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1536 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1538 CERROR("Unexpected +ve rc %d\n", rc);
1541 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1543 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1546 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1547 check_write_checksum(&body->oa, peer, client_cksum,
1548 body->oa.o_cksum, aa->aa_requested_nob,
1549 aa->aa_page_count, aa->aa_ppga,
1550 cksum_type_unpack(aa->aa_oa->o_flags)))
1553 rc = check_write_rcs(req, aa->aa_requested_nob,
1555 aa->aa_page_count, aa->aa_ppga);
1559 /* The rest of this function executes only for OST_READs */
1561 /* if unwrap_bulk failed, return -EAGAIN to retry */
1562 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1568 if (rc > aa->aa_requested_nob) {
1569 CERROR("Unexpected rc %d (%d requested)\n", rc,
1570 aa->aa_requested_nob);
1574 if (rc != req->rq_bulk->bd_nob_transferred) {
1575 CERROR ("Unexpected rc %d (%d transferred)\n",
1576 rc, req->rq_bulk->bd_nob_transferred);
1580 if (rc < aa->aa_requested_nob)
1581 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1583 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1584 static int cksum_counter;
1585 __u32 server_cksum = body->oa.o_cksum;
1588 cksum_type_t cksum_type;
1590 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1591 body->oa.o_flags : 0);
1592 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1593 aa->aa_ppga, OST_READ,
1596 if (peer->nid == req->rq_bulk->bd_sender) {
1600 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1603 if (server_cksum != client_cksum) {
1604 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1605 "%s%s%s inode "DFID" object "DOSTID
1606 " extent [%llu-%llu]\n",
1607 req->rq_import->imp_obd->obd_name,
1608 libcfs_nid2str(peer->nid),
1610 body->oa.o_valid & OBD_MD_FLFID ?
1611 body->oa.o_parent_seq : (__u64)0,
1612 body->oa.o_valid & OBD_MD_FLFID ?
1613 body->oa.o_parent_oid : 0,
1614 body->oa.o_valid & OBD_MD_FLFID ?
1615 body->oa.o_parent_ver : 0,
1616 POSTID(&body->oa.o_oi),
1617 aa->aa_ppga[0]->off,
1618 aa->aa_ppga[aa->aa_page_count-1]->off +
1619 aa->aa_ppga[aa->aa_page_count-1]->count -
1621 CERROR("client %x, server %x, cksum_type %x\n",
1622 client_cksum, server_cksum, cksum_type);
1624 aa->aa_oa->o_cksum = client_cksum;
1628 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1631 } else if (unlikely(client_cksum)) {
1632 static int cksum_missed;
1635 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1636 CERROR("Checksum %u requested from %s but not sent\n",
1637 cksum_missed, libcfs_nid2str(peer->nid));
1643 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1644 aa->aa_oa, &body->oa);
1649 static int osc_brw_redo_request(struct ptlrpc_request *request,
1650 struct osc_brw_async_args *aa, int rc)
1652 struct ptlrpc_request *new_req;
1653 struct osc_brw_async_args *new_aa;
1654 struct osc_async_page *oap;
1656 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1657 "redo for recoverable error %d", rc);
1659 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1660 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1661 aa->aa_cli, aa->aa_oa,
1662 NULL /* lsm unused by osc currently */,
1663 aa->aa_page_count, aa->aa_ppga,
1664 &new_req, aa->aa_ocapa, 0, 1);
1668 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1669 if (oap->oap_request != NULL) {
1670 LASSERTF(request == oap->oap_request,
1671 "request %p != oap_request %p\n",
1672 request, oap->oap_request);
1673 if (oap->oap_interrupted) {
1674 ptlrpc_req_finished(new_req);
1679 /* New request takes over pga and oaps from old request.
1680 * Note that copying a list_head doesn't work, need to move it... */
1682 new_req->rq_interpret_reply = request->rq_interpret_reply;
1683 new_req->rq_async_args = request->rq_async_args;
1684 /* cap resend delay to the current request timeout, this is similar to
1685 * what ptlrpc does (see after_reply()) */
1686 if (aa->aa_resends > new_req->rq_timeout)
1687 new_req->rq_sent = get_seconds() + new_req->rq_timeout;
1689 new_req->rq_sent = get_seconds() + aa->aa_resends;
1690 new_req->rq_generation_set = 1;
1691 new_req->rq_import_generation = request->rq_import_generation;
1693 new_aa = ptlrpc_req_async_args(new_req);
1695 INIT_LIST_HEAD(&new_aa->aa_oaps);
1696 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1697 INIT_LIST_HEAD(&new_aa->aa_exts);
1698 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1699 new_aa->aa_resends = aa->aa_resends;
1701 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1702 if (oap->oap_request) {
1703 ptlrpc_req_finished(oap->oap_request);
1704 oap->oap_request = ptlrpc_request_addref(new_req);
1708 new_aa->aa_ocapa = aa->aa_ocapa;
1709 aa->aa_ocapa = NULL;
1711 /* XXX: This code will run into problem if we're going to support
1712 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1713 * and wait for all of them to be finished. We should inherit request
1714 * set from old request. */
1715 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1717 DEBUG_REQ(D_INFO, new_req, "new request");
1722 * ugh, we want disk allocation on the target to happen in offset order. we'll
1723 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1724 * fine for our small page arrays and doesn't require allocation. its an
1725 * insertion sort that swaps elements that are strides apart, shrinking the
1726 * stride down until its '1' and the array is sorted.
1728 static void sort_brw_pages(struct brw_page **array, int num)
1731 struct brw_page *tmp;
1735 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1740 for (i = stride ; i < num ; i++) {
1743 while (j >= stride && array[j - stride]->off > tmp->off) {
1744 array[j] = array[j - stride];
1749 } while (stride > 1);
1752 static void osc_release_ppga(struct brw_page **ppga, u32 count)
1754 LASSERT(ppga != NULL);
1755 OBD_FREE(ppga, sizeof(*ppga) * count);
1758 static int brw_interpret(const struct lu_env *env,
1759 struct ptlrpc_request *req, void *data, int rc)
1761 struct osc_brw_async_args *aa = data;
1762 struct osc_extent *ext;
1763 struct osc_extent *tmp;
1764 struct cl_object *obj = NULL;
1765 struct client_obd *cli = aa->aa_cli;
1767 rc = osc_brw_fini_request(req, rc);
1768 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1769 /* When server return -EINPROGRESS, client should always retry
1770 * regardless of the number of times the bulk was resent already. */
1771 if (osc_recoverable_error(rc)) {
1772 if (req->rq_import_generation !=
1773 req->rq_import->imp_generation) {
1774 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1775 ""DOSTID", rc = %d.\n",
1776 req->rq_import->imp_obd->obd_name,
1777 POSTID(&aa->aa_oa->o_oi), rc);
1778 } else if (rc == -EINPROGRESS ||
1779 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1780 rc = osc_brw_redo_request(req, aa, rc);
1782 CERROR("%s: too many resent retries for object: %llu:%llu, rc = %d.\n",
1783 req->rq_import->imp_obd->obd_name,
1784 POSTID(&aa->aa_oa->o_oi), rc);
1789 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1794 capa_put(aa->aa_ocapa);
1795 aa->aa_ocapa = NULL;
1798 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1799 if (obj == NULL && rc == 0) {
1800 obj = osc2cl(ext->oe_obj);
1804 list_del_init(&ext->oe_link);
1805 osc_extent_finish(env, ext, 1, rc);
1807 LASSERT(list_empty(&aa->aa_exts));
1808 LASSERT(list_empty(&aa->aa_oaps));
1811 struct obdo *oa = aa->aa_oa;
1812 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1813 unsigned long valid = 0;
1816 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1817 attr->cat_blocks = oa->o_blocks;
1818 valid |= CAT_BLOCKS;
1820 if (oa->o_valid & OBD_MD_FLMTIME) {
1821 attr->cat_mtime = oa->o_mtime;
1824 if (oa->o_valid & OBD_MD_FLATIME) {
1825 attr->cat_atime = oa->o_atime;
1828 if (oa->o_valid & OBD_MD_FLCTIME) {
1829 attr->cat_ctime = oa->o_ctime;
1833 cl_object_attr_lock(obj);
1834 cl_object_attr_set(env, obj, attr, valid);
1835 cl_object_attr_unlock(obj);
1837 cl_object_put(env, obj);
1839 OBDO_FREE(aa->aa_oa);
1841 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1842 req->rq_bulk->bd_nob_transferred);
1843 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1844 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1846 client_obd_list_lock(&cli->cl_loi_list_lock);
1847 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
1848 * is called so we know whether to go to sync BRWs or wait for more
1849 * RPCs to complete */
1850 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
1851 cli->cl_w_in_flight--;
1853 cli->cl_r_in_flight--;
1854 osc_wake_cache_waiters(cli);
1855 client_obd_list_unlock(&cli->cl_loi_list_lock);
1857 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
1862 * Build an RPC by the list of extent @ext_list. The caller must ensure
1863 * that the total pages in this list are NOT over max pages per RPC.
1864 * Extents in the list must be in OES_RPC state.
1866 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
1867 struct list_head *ext_list, int cmd, pdl_policy_t pol)
1869 struct ptlrpc_request *req = NULL;
1870 struct osc_extent *ext;
1871 struct brw_page **pga = NULL;
1872 struct osc_brw_async_args *aa = NULL;
1873 struct obdo *oa = NULL;
1874 struct osc_async_page *oap;
1875 struct osc_async_page *tmp;
1876 struct cl_req *clerq = NULL;
1877 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
1879 struct ldlm_lock *lock = NULL;
1880 struct cl_req_attr *crattr = NULL;
1881 u64 starting_offset = OBD_OBJECT_EOF;
1882 u64 ending_offset = 0;
1888 LIST_HEAD(rpc_list);
1890 LASSERT(!list_empty(ext_list));
1892 /* add pages into rpc_list to build BRW rpc */
1893 list_for_each_entry(ext, ext_list, oe_link) {
1894 LASSERT(ext->oe_state == OES_RPC);
1895 mem_tight |= ext->oe_memalloc;
1896 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
1898 list_add_tail(&oap->oap_rpc_item, &rpc_list);
1899 if (starting_offset > oap->oap_obj_off)
1900 starting_offset = oap->oap_obj_off;
1902 LASSERT(oap->oap_page_off == 0);
1903 if (ending_offset < oap->oap_obj_off + oap->oap_count)
1904 ending_offset = oap->oap_obj_off +
1907 LASSERT(oap->oap_page_off + oap->oap_count ==
1913 mpflag = cfs_memory_pressure_get_and_set();
1915 OBD_ALLOC(crattr, sizeof(*crattr));
1916 if (crattr == NULL) {
1921 OBD_ALLOC(pga, sizeof(*pga) * page_count);
1934 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
1935 struct cl_page *page = oap2cl_page(oap);
1936 if (clerq == NULL) {
1937 clerq = cl_req_alloc(env, page, crt,
1938 1 /* only 1-object rpcs for now */);
1939 if (IS_ERR(clerq)) {
1940 rc = PTR_ERR(clerq);
1943 lock = oap->oap_ldlm_lock;
1946 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
1947 pga[i] = &oap->oap_brw_page;
1948 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
1949 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
1950 pga[i]->pg, page_index(oap->oap_page), oap,
1953 cl_req_page_add(env, clerq, page);
1956 /* always get the data for the obdo for the rpc */
1957 LASSERT(clerq != NULL);
1958 crattr->cra_oa = oa;
1959 cl_req_attr_set(env, clerq, crattr, ~0ULL);
1961 oa->o_handle = lock->l_remote_handle;
1962 oa->o_valid |= OBD_MD_FLHANDLE;
1965 rc = cl_req_prep(env, clerq);
1967 CERROR("cl_req_prep failed: %d\n", rc);
1971 sort_brw_pages(pga, page_count);
1972 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
1973 pga, &req, crattr->cra_capa, 1, 0);
1975 CERROR("prep_req failed: %d\n", rc);
1979 req->rq_interpret_reply = brw_interpret;
1982 req->rq_memalloc = 1;
1984 /* Need to update the timestamps after the request is built in case
1985 * we race with setattr (locally or in queue at OST). If OST gets
1986 * later setattr before earlier BRW (as determined by the request xid),
1987 * the OST will not use BRW timestamps. Sadly, there is no obvious
1988 * way to do this in a single call. bug 10150 */
1989 cl_req_attr_set(env, clerq, crattr,
1990 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
1992 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
1994 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1995 aa = ptlrpc_req_async_args(req);
1996 INIT_LIST_HEAD(&aa->aa_oaps);
1997 list_splice_init(&rpc_list, &aa->aa_oaps);
1998 INIT_LIST_HEAD(&aa->aa_exts);
1999 list_splice_init(ext_list, &aa->aa_exts);
2000 aa->aa_clerq = clerq;
2002 /* queued sync pages can be torn down while the pages
2003 * were between the pending list and the rpc */
2005 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2006 /* only one oap gets a request reference */
2009 if (oap->oap_interrupted && !req->rq_intr) {
2010 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2012 ptlrpc_mark_interrupted(req);
2016 tmp->oap_request = ptlrpc_request_addref(req);
2018 client_obd_list_lock(&cli->cl_loi_list_lock);
2019 starting_offset >>= PAGE_CACHE_SHIFT;
2020 if (cmd == OBD_BRW_READ) {
2021 cli->cl_r_in_flight++;
2022 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2023 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2024 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2025 starting_offset + 1);
2027 cli->cl_w_in_flight++;
2028 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2029 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2030 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2031 starting_offset + 1);
2033 client_obd_list_unlock(&cli->cl_loi_list_lock);
2035 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2036 page_count, aa, cli->cl_r_in_flight,
2037 cli->cl_w_in_flight);
2039 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2040 * see which CPU/NUMA node the majority of pages were allocated
2041 * on, and try to assign the async RPC to the CPU core
2042 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2044 * But on the other hand, we expect that multiple ptlrpcd
2045 * threads and the initial write sponsor can run in parallel,
2046 * especially when data checksum is enabled, which is CPU-bound
2047 * operation and single ptlrpcd thread cannot process in time.
2048 * So more ptlrpcd threads sharing BRW load
2049 * (with PDL_POLICY_ROUND) seems better.
2051 ptlrpcd_add_req(req, pol, -1);
2056 cfs_memory_pressure_restore(mpflag);
2058 if (crattr != NULL) {
2059 capa_put(crattr->cra_capa);
2060 OBD_FREE(crattr, sizeof(*crattr));
2064 LASSERT(req == NULL);
2069 OBD_FREE(pga, sizeof(*pga) * page_count);
2070 /* this should happen rarely and is pretty bad, it makes the
2071 * pending list not follow the dirty order */
2072 while (!list_empty(ext_list)) {
2073 ext = list_entry(ext_list->next, struct osc_extent,
2075 list_del_init(&ext->oe_link);
2076 osc_extent_finish(env, ext, 0, rc);
2078 if (clerq && !IS_ERR(clerq))
2079 cl_req_completion(env, clerq, rc);
2084 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2085 struct ldlm_enqueue_info *einfo)
2087 void *data = einfo->ei_cbdata;
2090 LASSERT(lock != NULL);
2091 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2092 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2093 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2094 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2096 lock_res_and_lock(lock);
2097 spin_lock(&osc_ast_guard);
2099 if (lock->l_ast_data == NULL)
2100 lock->l_ast_data = data;
2101 if (lock->l_ast_data == data)
2104 spin_unlock(&osc_ast_guard);
2105 unlock_res_and_lock(lock);
2110 static int osc_set_data_with_check(struct lustre_handle *lockh,
2111 struct ldlm_enqueue_info *einfo)
2113 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2117 set = osc_set_lock_data_with_check(lock, einfo);
2118 LDLM_LOCK_PUT(lock);
2120 CERROR("lockh %p, data %p - client evicted?\n",
2121 lockh, einfo->ei_cbdata);
2125 /* find any ldlm lock of the inode in osc
2129 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2130 ldlm_iterator_t replace, void *data)
2132 struct ldlm_res_id res_id;
2133 struct obd_device *obd = class_exp2obd(exp);
2136 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2137 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2138 if (rc == LDLM_ITER_STOP)
2140 if (rc == LDLM_ITER_CONTINUE)
2145 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2146 obd_enqueue_update_f upcall, void *cookie,
2147 __u64 *flags, int agl, int rc)
2149 int intent = *flags & LDLM_FL_HAS_INTENT;
2152 /* The request was created before ldlm_cli_enqueue call. */
2153 if (rc == ELDLM_LOCK_ABORTED) {
2154 struct ldlm_reply *rep;
2155 rep = req_capsule_server_get(&req->rq_pill,
2158 LASSERT(rep != NULL);
2159 rep->lock_policy_res1 =
2160 ptlrpc_status_ntoh(rep->lock_policy_res1);
2161 if (rep->lock_policy_res1)
2162 rc = rep->lock_policy_res1;
2166 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2168 *flags |= LDLM_FL_LVB_READY;
2169 CDEBUG(D_INODE, "got kms %llu blocks %llu mtime %llu\n",
2170 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2173 /* Call the update callback. */
2174 rc = (*upcall)(cookie, rc);
2178 static int osc_enqueue_interpret(const struct lu_env *env,
2179 struct ptlrpc_request *req,
2180 struct osc_enqueue_args *aa, int rc)
2182 struct ldlm_lock *lock;
2183 struct lustre_handle handle;
2185 struct ost_lvb *lvb;
2187 __u64 *flags = aa->oa_flags;
2189 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2190 * might be freed anytime after lock upcall has been called. */
2191 lustre_handle_copy(&handle, aa->oa_lockh);
2192 mode = aa->oa_ei->ei_mode;
2194 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2196 lock = ldlm_handle2lock(&handle);
2198 /* Take an additional reference so that a blocking AST that
2199 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2200 * to arrive after an upcall has been executed by
2201 * osc_enqueue_fini(). */
2202 ldlm_lock_addref(&handle, mode);
2204 /* Let CP AST to grant the lock first. */
2205 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2207 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2212 lvb_len = sizeof(*aa->oa_lvb);
2215 /* Complete obtaining the lock procedure. */
2216 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2217 mode, flags, lvb, lvb_len, &handle, rc);
2218 /* Complete osc stuff. */
2219 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2220 flags, aa->oa_agl, rc);
2222 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2224 /* Release the lock for async request. */
2225 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2227 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2228 * not already released by
2229 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2231 ldlm_lock_decref(&handle, mode);
2233 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2234 aa->oa_lockh, req, aa);
2235 ldlm_lock_decref(&handle, mode);
2236 LDLM_LOCK_PUT(lock);
2240 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2242 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2243 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2244 * other synchronous requests, however keeping some locks and trying to obtain
2245 * others may take a considerable amount of time in a case of ost failure; and
2246 * when other sync requests do not get released lock from a client, the client
2247 * is excluded from the cluster -- such scenarious make the life difficult, so
2248 * release locks just after they are obtained. */
2249 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2250 __u64 *flags, ldlm_policy_data_t *policy,
2251 struct ost_lvb *lvb, int kms_valid,
2252 obd_enqueue_update_f upcall, void *cookie,
2253 struct ldlm_enqueue_info *einfo,
2254 struct lustre_handle *lockh,
2255 struct ptlrpc_request_set *rqset, int async, int agl)
2257 struct obd_device *obd = exp->exp_obd;
2258 struct ptlrpc_request *req = NULL;
2259 int intent = *flags & LDLM_FL_HAS_INTENT;
2260 __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2264 /* Filesystem lock extents are extended to page boundaries so that
2265 * dealing with the page cache is a little smoother. */
2266 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2267 policy->l_extent.end |= ~CFS_PAGE_MASK;
2270 * kms is not valid when either object is completely fresh (so that no
2271 * locks are cached), or object was evicted. In the latter case cached
2272 * lock cannot be used, because it would prime inode state with
2273 * potentially stale LVB.
2278 /* Next, search for already existing extent locks that will cover us */
2279 /* If we're trying to read, we also search for an existing PW lock. The
2280 * VFS and page cache already protect us locally, so lots of readers/
2281 * writers can share a single PW lock.
2283 * There are problems with conversion deadlocks, so instead of
2284 * converting a read lock to a write lock, we'll just enqueue a new
2287 * At some point we should cancel the read lock instead of making them
2288 * send us a blocking callback, but there are problems with canceling
2289 * locks out from other users right now, too. */
2290 mode = einfo->ei_mode;
2291 if (einfo->ei_mode == LCK_PR)
2293 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2294 einfo->ei_type, policy, mode, lockh, 0);
2296 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2298 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2299 /* For AGL, if enqueue RPC is sent but the lock is not
2300 * granted, then skip to process this strpe.
2301 * Return -ECANCELED to tell the caller. */
2302 ldlm_lock_decref(lockh, mode);
2303 LDLM_LOCK_PUT(matched);
2305 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2306 *flags |= LDLM_FL_LVB_READY;
2307 /* addref the lock only if not async requests and PW
2308 * lock is matched whereas we asked for PR. */
2309 if (!rqset && einfo->ei_mode != mode)
2310 ldlm_lock_addref(lockh, LCK_PR);
2312 /* I would like to be able to ASSERT here that
2313 * rss <= kms, but I can't, for reasons which
2314 * are explained in lov_enqueue() */
2317 /* We already have a lock, and it's referenced.
2319 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2320 * AGL upcall may change it to CLS_HELD directly. */
2321 (*upcall)(cookie, ELDLM_OK);
2323 if (einfo->ei_mode != mode)
2324 ldlm_lock_decref(lockh, LCK_PW);
2326 /* For async requests, decref the lock. */
2327 ldlm_lock_decref(lockh, einfo->ei_mode);
2328 LDLM_LOCK_PUT(matched);
2331 ldlm_lock_decref(lockh, mode);
2332 LDLM_LOCK_PUT(matched);
2339 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2340 &RQF_LDLM_ENQUEUE_LVB);
2344 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2346 ptlrpc_request_free(req);
2350 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2352 ptlrpc_request_set_replen(req);
2355 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2356 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2358 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2359 sizeof(*lvb), LVB_T_OST, lockh, async);
2362 struct osc_enqueue_args *aa;
2363 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2364 aa = ptlrpc_req_async_args(req);
2367 aa->oa_flags = flags;
2368 aa->oa_upcall = upcall;
2369 aa->oa_cookie = cookie;
2371 aa->oa_lockh = lockh;
2374 req->rq_interpret_reply =
2375 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2376 if (rqset == PTLRPCD_SET)
2377 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2379 ptlrpc_set_add_req(rqset, req);
2380 } else if (intent) {
2381 ptlrpc_req_finished(req);
2386 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2388 ptlrpc_req_finished(req);
2393 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2394 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2395 __u64 *flags, void *data, struct lustre_handle *lockh,
2398 struct obd_device *obd = exp->exp_obd;
2399 __u64 lflags = *flags;
2402 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2405 /* Filesystem lock extents are extended to page boundaries so that
2406 * dealing with the page cache is a little smoother */
2407 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2408 policy->l_extent.end |= ~CFS_PAGE_MASK;
2410 /* Next, search for already existing extent locks that will cover us */
2411 /* If we're trying to read, we also search for an existing PW lock. The
2412 * VFS and page cache already protect us locally, so lots of readers/
2413 * writers can share a single PW lock. */
2417 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2418 res_id, type, policy, rc, lockh, unref);
2421 if (!osc_set_data_with_check(lockh, data)) {
2422 if (!(lflags & LDLM_FL_TEST_LOCK))
2423 ldlm_lock_decref(lockh, rc);
2427 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2428 ldlm_lock_addref(lockh, LCK_PR);
2429 ldlm_lock_decref(lockh, LCK_PW);
2436 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2438 if (unlikely(mode == LCK_GROUP))
2439 ldlm_lock_decref_and_cancel(lockh, mode);
2441 ldlm_lock_decref(lockh, mode);
2446 static int osc_statfs_interpret(const struct lu_env *env,
2447 struct ptlrpc_request *req,
2448 struct osc_async_args *aa, int rc)
2450 struct obd_statfs *msfs;
2453 /* The request has in fact never been sent
2454 * due to issues at a higher level (LOV).
2455 * Exit immediately since the caller is
2456 * aware of the problem and takes care
2457 * of the clean up */
2460 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2461 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY)) {
2469 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2475 *aa->aa_oi->oi_osfs = *msfs;
2477 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2481 static int osc_statfs_async(struct obd_export *exp,
2482 struct obd_info *oinfo, __u64 max_age,
2483 struct ptlrpc_request_set *rqset)
2485 struct obd_device *obd = class_exp2obd(exp);
2486 struct ptlrpc_request *req;
2487 struct osc_async_args *aa;
2490 /* We could possibly pass max_age in the request (as an absolute
2491 * timestamp or a "seconds.usec ago") so the target can avoid doing
2492 * extra calls into the filesystem if that isn't necessary (e.g.
2493 * during mount that would help a bit). Having relative timestamps
2494 * is not so great if request processing is slow, while absolute
2495 * timestamps are not ideal because they need time synchronization. */
2496 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2500 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2502 ptlrpc_request_free(req);
2505 ptlrpc_request_set_replen(req);
2506 req->rq_request_portal = OST_CREATE_PORTAL;
2507 ptlrpc_at_set_req_timeout(req);
2509 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2510 /* procfs requests not want stat in wait for avoid deadlock */
2511 req->rq_no_resend = 1;
2512 req->rq_no_delay = 1;
2515 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2516 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2517 aa = ptlrpc_req_async_args(req);
2520 ptlrpc_set_add_req(rqset, req);
2524 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2525 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2527 struct obd_device *obd = class_exp2obd(exp);
2528 struct obd_statfs *msfs;
2529 struct ptlrpc_request *req;
2530 struct obd_import *imp = NULL;
2533 /*Since the request might also come from lprocfs, so we need
2534 *sync this with client_disconnect_export Bug15684*/
2535 down_read(&obd->u.cli.cl_sem);
2536 if (obd->u.cli.cl_import)
2537 imp = class_import_get(obd->u.cli.cl_import);
2538 up_read(&obd->u.cli.cl_sem);
2542 /* We could possibly pass max_age in the request (as an absolute
2543 * timestamp or a "seconds.usec ago") so the target can avoid doing
2544 * extra calls into the filesystem if that isn't necessary (e.g.
2545 * during mount that would help a bit). Having relative timestamps
2546 * is not so great if request processing is slow, while absolute
2547 * timestamps are not ideal because they need time synchronization. */
2548 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2550 class_import_put(imp);
2555 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2557 ptlrpc_request_free(req);
2560 ptlrpc_request_set_replen(req);
2561 req->rq_request_portal = OST_CREATE_PORTAL;
2562 ptlrpc_at_set_req_timeout(req);
2564 if (flags & OBD_STATFS_NODELAY) {
2565 /* procfs requests not want stat in wait for avoid deadlock */
2566 req->rq_no_resend = 1;
2567 req->rq_no_delay = 1;
2570 rc = ptlrpc_queue_wait(req);
2574 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2583 ptlrpc_req_finished(req);
2587 /* Retrieve object striping information.
2589 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2590 * the maximum number of OST indices which will fit in the user buffer.
2591 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2593 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2595 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2596 struct lov_user_md_v3 lum, *lumk;
2597 struct lov_user_ost_data_v1 *lmm_objects;
2598 int rc = 0, lum_size;
2603 /* we only need the header part from user space to get lmm_magic and
2604 * lmm_stripe_count, (the header part is common to v1 and v3) */
2605 lum_size = sizeof(struct lov_user_md_v1);
2606 if (copy_from_user(&lum, lump, lum_size))
2609 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2610 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2613 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2614 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2615 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2616 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2618 /* we can use lov_mds_md_size() to compute lum_size
2619 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2620 if (lum.lmm_stripe_count > 0) {
2621 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2622 OBD_ALLOC(lumk, lum_size);
2626 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2628 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2630 lmm_objects = &(lumk->lmm_objects[0]);
2631 lmm_objects->l_ost_oi = lsm->lsm_oi;
2633 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2637 lumk->lmm_oi = lsm->lsm_oi;
2638 lumk->lmm_stripe_count = 1;
2640 if (copy_to_user(lump, lumk, lum_size))
2644 OBD_FREE(lumk, lum_size);
2650 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2651 void *karg, void *uarg)
2653 struct obd_device *obd = exp->exp_obd;
2654 struct obd_ioctl_data *data = karg;
2657 if (!try_module_get(THIS_MODULE)) {
2658 CERROR("Can't get module. Is it alive?");
2662 case OBD_IOC_LOV_GET_CONFIG: {
2664 struct lov_desc *desc;
2665 struct obd_uuid uuid;
2669 if (obd_ioctl_getdata(&buf, &len, (void *)uarg)) {
2674 data = (struct obd_ioctl_data *)buf;
2676 if (sizeof(*desc) > data->ioc_inllen1) {
2677 obd_ioctl_freedata(buf, len);
2682 if (data->ioc_inllen2 < sizeof(uuid)) {
2683 obd_ioctl_freedata(buf, len);
2688 desc = (struct lov_desc *)data->ioc_inlbuf1;
2689 desc->ld_tgt_count = 1;
2690 desc->ld_active_tgt_count = 1;
2691 desc->ld_default_stripe_count = 1;
2692 desc->ld_default_stripe_size = 0;
2693 desc->ld_default_stripe_offset = 0;
2694 desc->ld_pattern = 0;
2695 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2697 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2699 err = copy_to_user((void *)uarg, buf, len);
2702 obd_ioctl_freedata(buf, len);
2705 case LL_IOC_LOV_SETSTRIPE:
2706 err = obd_alloc_memmd(exp, karg);
2710 case LL_IOC_LOV_GETSTRIPE:
2711 err = osc_getstripe(karg, uarg);
2713 case OBD_IOC_CLIENT_RECOVER:
2714 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2715 data->ioc_inlbuf1, 0);
2719 case IOC_OSC_SET_ACTIVE:
2720 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2723 case OBD_IOC_POLL_QUOTACHECK:
2724 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2726 case OBD_IOC_PING_TARGET:
2727 err = ptlrpc_obd_ping(obd);
2730 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2731 cmd, current_comm());
2736 module_put(THIS_MODULE);
2740 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2741 u32 keylen, void *key, __u32 *vallen, void *val,
2742 struct lov_stripe_md *lsm)
2744 if (!vallen || !val)
2747 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2748 __u32 *stripe = val;
2749 *vallen = sizeof(*stripe);
2752 } else if (KEY_IS(KEY_LAST_ID)) {
2753 struct ptlrpc_request *req;
2758 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2759 &RQF_OST_GET_INFO_LAST_ID);
2763 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2764 RCL_CLIENT, keylen);
2765 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2767 ptlrpc_request_free(req);
2771 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2772 memcpy(tmp, key, keylen);
2774 req->rq_no_delay = req->rq_no_resend = 1;
2775 ptlrpc_request_set_replen(req);
2776 rc = ptlrpc_queue_wait(req);
2780 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
2781 if (reply == NULL) {
2786 *((u64 *)val) = *reply;
2788 ptlrpc_req_finished(req);
2790 } else if (KEY_IS(KEY_FIEMAP)) {
2791 struct ll_fiemap_info_key *fm_key =
2792 (struct ll_fiemap_info_key *)key;
2793 struct ldlm_res_id res_id;
2794 ldlm_policy_data_t policy;
2795 struct lustre_handle lockh;
2796 ldlm_mode_t mode = 0;
2797 struct ptlrpc_request *req;
2798 struct ll_user_fiemap *reply;
2802 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
2805 policy.l_extent.start = fm_key->fiemap.fm_start &
2808 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
2809 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
2810 policy.l_extent.end = OBD_OBJECT_EOF;
2812 policy.l_extent.end = (fm_key->fiemap.fm_start +
2813 fm_key->fiemap.fm_length +
2814 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
2816 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
2817 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
2818 LDLM_FL_BLOCK_GRANTED |
2820 &res_id, LDLM_EXTENT, &policy,
2821 LCK_PR | LCK_PW, &lockh, 0);
2822 if (mode) { /* lock is cached on client */
2823 if (mode != LCK_PR) {
2824 ldlm_lock_addref(&lockh, LCK_PR);
2825 ldlm_lock_decref(&lockh, LCK_PW);
2827 } else { /* no cached lock, needs acquire lock on server side */
2828 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
2829 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
2833 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2834 &RQF_OST_GET_INFO_FIEMAP);
2840 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
2841 RCL_CLIENT, keylen);
2842 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2843 RCL_CLIENT, *vallen);
2844 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
2845 RCL_SERVER, *vallen);
2847 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2849 ptlrpc_request_free(req);
2853 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
2854 memcpy(tmp, key, keylen);
2855 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2856 memcpy(tmp, val, *vallen);
2858 ptlrpc_request_set_replen(req);
2859 rc = ptlrpc_queue_wait(req);
2863 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
2864 if (reply == NULL) {
2869 memcpy(val, reply, *vallen);
2871 ptlrpc_req_finished(req);
2874 ldlm_lock_decref(&lockh, LCK_PR);
2881 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
2882 u32 keylen, void *key, u32 vallen,
2883 void *val, struct ptlrpc_request_set *set)
2885 struct ptlrpc_request *req;
2886 struct obd_device *obd = exp->exp_obd;
2887 struct obd_import *imp = class_exp2cliimp(exp);
2891 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
2893 if (KEY_IS(KEY_CHECKSUM)) {
2894 if (vallen != sizeof(int))
2896 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
2900 if (KEY_IS(KEY_SPTLRPC_CONF)) {
2901 sptlrpc_conf_client_adapt(obd);
2905 if (KEY_IS(KEY_FLUSH_CTX)) {
2906 sptlrpc_import_flush_my_ctx(imp);
2910 if (KEY_IS(KEY_CACHE_SET)) {
2911 struct client_obd *cli = &obd->u.cli;
2913 LASSERT(cli->cl_cache == NULL); /* only once */
2914 cli->cl_cache = (struct cl_client_cache *)val;
2915 atomic_inc(&cli->cl_cache->ccc_users);
2916 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
2918 /* add this osc into entity list */
2919 LASSERT(list_empty(&cli->cl_lru_osc));
2920 spin_lock(&cli->cl_cache->ccc_lru_lock);
2921 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
2922 spin_unlock(&cli->cl_cache->ccc_lru_lock);
2927 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
2928 struct client_obd *cli = &obd->u.cli;
2929 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
2930 int target = *(int *)val;
2932 nr = osc_lru_shrink(cli, min(nr, target));
2937 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
2940 /* We pass all other commands directly to OST. Since nobody calls osc
2941 methods directly and everybody is supposed to go through LOV, we
2942 assume lov checked invalid values for us.
2943 The only recognised values so far are evict_by_nid and mds_conn.
2944 Even if something bad goes through, we'd get a -EINVAL from OST
2947 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
2948 &RQF_OST_SET_GRANT_INFO :
2953 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2954 RCL_CLIENT, keylen);
2955 if (!KEY_IS(KEY_GRANT_SHRINK))
2956 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
2957 RCL_CLIENT, vallen);
2958 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
2960 ptlrpc_request_free(req);
2964 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
2965 memcpy(tmp, key, keylen);
2966 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
2969 memcpy(tmp, val, vallen);
2971 if (KEY_IS(KEY_GRANT_SHRINK)) {
2972 struct osc_brw_async_args *aa;
2975 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2976 aa = ptlrpc_req_async_args(req);
2979 ptlrpc_req_finished(req);
2982 *oa = ((struct ost_body *)val)->oa;
2984 req->rq_interpret_reply = osc_shrink_grant_interpret;
2987 ptlrpc_request_set_replen(req);
2988 if (!KEY_IS(KEY_GRANT_SHRINK)) {
2989 LASSERT(set != NULL);
2990 ptlrpc_set_add_req(set, req);
2991 ptlrpc_check_set(NULL, set);
2993 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2998 static int osc_reconnect(const struct lu_env *env,
2999 struct obd_export *exp, struct obd_device *obd,
3000 struct obd_uuid *cluuid,
3001 struct obd_connect_data *data,
3004 struct client_obd *cli = &obd->u.cli;
3006 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3009 client_obd_list_lock(&cli->cl_loi_list_lock);
3010 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3011 2 * cli_brw_size(obd);
3012 lost_grant = cli->cl_lost_grant;
3013 cli->cl_lost_grant = 0;
3014 client_obd_list_unlock(&cli->cl_loi_list_lock);
3016 CDEBUG(D_RPCTRACE, "ocd_connect_flags: %#llx ocd_version: %d"
3017 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3018 data->ocd_version, data->ocd_grant, lost_grant);
3024 static int osc_disconnect(struct obd_export *exp)
3026 struct obd_device *obd = class_exp2obd(exp);
3029 rc = client_disconnect_export(exp);
3031 * Initially we put del_shrink_grant before disconnect_export, but it
3032 * causes the following problem if setup (connect) and cleanup
3033 * (disconnect) are tangled together.
3034 * connect p1 disconnect p2
3035 * ptlrpc_connect_import
3036 * ............... class_manual_cleanup
3039 * ptlrpc_connect_interrupt
3041 * add this client to shrink list
3043 * Bang! pinger trigger the shrink.
3044 * So the osc should be disconnected from the shrink list, after we
3045 * are sure the import has been destroyed. BUG18662
3047 if (obd->u.cli.cl_import == NULL)
3048 osc_del_shrink_grant(&obd->u.cli);
3052 static int osc_import_event(struct obd_device *obd,
3053 struct obd_import *imp,
3054 enum obd_import_event event)
3056 struct client_obd *cli;
3059 LASSERT(imp->imp_obd == obd);
3062 case IMP_EVENT_DISCON: {
3064 client_obd_list_lock(&cli->cl_loi_list_lock);
3065 cli->cl_avail_grant = 0;
3066 cli->cl_lost_grant = 0;
3067 client_obd_list_unlock(&cli->cl_loi_list_lock);
3070 case IMP_EVENT_INACTIVE: {
3071 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3074 case IMP_EVENT_INVALIDATE: {
3075 struct ldlm_namespace *ns = obd->obd_namespace;
3079 env = cl_env_get(&refcheck);
3083 /* all pages go to failing rpcs due to the invalid
3085 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3087 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3088 cl_env_put(env, &refcheck);
3093 case IMP_EVENT_ACTIVE: {
3094 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3097 case IMP_EVENT_OCD: {
3098 struct obd_connect_data *ocd = &imp->imp_connect_data;
3100 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3101 osc_init_grant(&obd->u.cli, ocd);
3104 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3105 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3107 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3110 case IMP_EVENT_DEACTIVATE: {
3111 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3114 case IMP_EVENT_ACTIVATE: {
3115 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3119 CERROR("Unknown import event %d\n", event);
3126 * Determine whether the lock can be canceled before replaying the lock
3127 * during recovery, see bug16774 for detailed information.
3129 * \retval zero the lock can't be canceled
3130 * \retval other ok to cancel
3132 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3134 check_res_locked(lock->l_resource);
3137 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3139 * XXX as a future improvement, we can also cancel unused write lock
3140 * if it doesn't have dirty data and active mmaps.
3142 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3143 (lock->l_granted_mode == LCK_PR ||
3144 lock->l_granted_mode == LCK_CR) &&
3145 (osc_dlm_lock_pageref(lock) == 0))
3151 static int brw_queue_work(const struct lu_env *env, void *data)
3153 struct client_obd *cli = data;
3155 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3157 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3161 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3163 struct lprocfs_static_vars lvars = { NULL };
3164 struct client_obd *cli = &obd->u.cli;
3168 rc = ptlrpcd_addref();
3172 rc = client_obd_setup(obd, lcfg);
3176 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3177 if (IS_ERR(handler)) {
3178 rc = PTR_ERR(handler);
3179 goto out_client_setup;
3181 cli->cl_writeback_work = handler;
3183 rc = osc_quota_setup(obd);
3185 goto out_ptlrpcd_work;
3187 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3188 lprocfs_osc_init_vars(&lvars);
3189 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3190 lproc_osc_attach_seqstat(obd);
3191 sptlrpc_lprocfs_cliobd_attach(obd);
3192 ptlrpc_lprocfs_register_obd(obd);
3195 /* We need to allocate a few requests more, because
3196 * brw_interpret tries to create new requests before freeing
3197 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3198 * reserved, but I'm afraid that might be too much wasted RAM
3199 * in fact, so 2 is just my guess and still should work. */
3200 cli->cl_import->imp_rq_pool =
3201 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3203 ptlrpc_add_rqs_to_pool);
3205 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3206 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3210 ptlrpcd_destroy_work(handler);
3212 client_obd_cleanup(obd);
3218 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3221 case OBD_CLEANUP_EARLY: {
3222 struct obd_import *imp;
3223 imp = obd->u.cli.cl_import;
3224 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3225 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3226 ptlrpc_deactivate_import(imp);
3227 spin_lock(&imp->imp_lock);
3228 imp->imp_pingable = 0;
3229 spin_unlock(&imp->imp_lock);
3232 case OBD_CLEANUP_EXPORTS: {
3233 struct client_obd *cli = &obd->u.cli;
3235 * for echo client, export may be on zombie list, wait for
3236 * zombie thread to cull it, because cli.cl_import will be
3237 * cleared in client_disconnect_export():
3238 * class_export_destroy() -> obd_cleanup() ->
3239 * echo_device_free() -> echo_client_cleanup() ->
3240 * obd_disconnect() -> osc_disconnect() ->
3241 * client_disconnect_export()
3243 obd_zombie_barrier();
3244 if (cli->cl_writeback_work) {
3245 ptlrpcd_destroy_work(cli->cl_writeback_work);
3246 cli->cl_writeback_work = NULL;
3248 obd_cleanup_client_import(obd);
3249 ptlrpc_lprocfs_unregister_obd(obd);
3250 lprocfs_obd_cleanup(obd);
3257 int osc_cleanup(struct obd_device *obd)
3259 struct client_obd *cli = &obd->u.cli;
3263 if (cli->cl_cache != NULL) {
3264 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3265 spin_lock(&cli->cl_cache->ccc_lru_lock);
3266 list_del_init(&cli->cl_lru_osc);
3267 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3268 cli->cl_lru_left = NULL;
3269 atomic_dec(&cli->cl_cache->ccc_users);
3270 cli->cl_cache = NULL;
3273 /* free memory of osc quota cache */
3274 osc_quota_cleanup(obd);
3276 rc = client_obd_cleanup(obd);
3282 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3284 struct lprocfs_static_vars lvars = { NULL };
3287 lprocfs_osc_init_vars(&lvars);
3289 switch (lcfg->lcfg_command) {
3291 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3301 static int osc_process_config(struct obd_device *obd, u32 len, void *buf)
3303 return osc_process_config_base(obd, buf);
3306 struct obd_ops osc_obd_ops = {
3307 .o_owner = THIS_MODULE,
3308 .o_setup = osc_setup,
3309 .o_precleanup = osc_precleanup,
3310 .o_cleanup = osc_cleanup,
3311 .o_add_conn = client_import_add_conn,
3312 .o_del_conn = client_import_del_conn,
3313 .o_connect = client_connect_import,
3314 .o_reconnect = osc_reconnect,
3315 .o_disconnect = osc_disconnect,
3316 .o_statfs = osc_statfs,
3317 .o_statfs_async = osc_statfs_async,
3318 .o_packmd = osc_packmd,
3319 .o_unpackmd = osc_unpackmd,
3320 .o_create = osc_create,
3321 .o_destroy = osc_destroy,
3322 .o_getattr = osc_getattr,
3323 .o_getattr_async = osc_getattr_async,
3324 .o_setattr = osc_setattr,
3325 .o_setattr_async = osc_setattr_async,
3326 .o_find_cbdata = osc_find_cbdata,
3327 .o_iocontrol = osc_iocontrol,
3328 .o_get_info = osc_get_info,
3329 .o_set_info_async = osc_set_info_async,
3330 .o_import_event = osc_import_event,
3331 .o_process_config = osc_process_config,
3332 .o_quotactl = osc_quotactl,
3333 .o_quotacheck = osc_quotacheck,
3336 extern struct lu_kmem_descr osc_caches[];
3337 extern spinlock_t osc_ast_guard;
3338 extern struct lock_class_key osc_ast_guard_class;
3340 int __init osc_init(void)
3342 struct lprocfs_static_vars lvars = { NULL };
3345 /* print an address of _any_ initialized kernel symbol from this
3346 * module, to allow debugging with gdb that doesn't support data
3347 * symbols from modules.*/
3348 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3350 rc = lu_kmem_init(osc_caches);
3354 lprocfs_osc_init_vars(&lvars);
3356 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3357 LUSTRE_OSC_NAME, &osc_device_type);
3359 lu_kmem_fini(osc_caches);
3363 spin_lock_init(&osc_ast_guard);
3364 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3369 static void /*__exit*/ osc_exit(void)
3371 class_unregister_type(LUSTRE_OSC_NAME);
3372 lu_kmem_fini(osc_caches);
3375 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3376 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3377 MODULE_LICENSE("GPL");
3378 MODULE_VERSION(LUSTRE_VERSION_STRING);
3380 module_init(osc_init);
3381 module_exit(osc_exit);