From e548e9b93d3e565e42b938a99804114565be1f81 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 10 Jun 2015 15:17:56 +0800 Subject: [PATCH] ceph: re-send flushing caps (which are revoked) in reconnect stage if flushing caps were revoked, we should re-send the cap flush in client reconnect stage. This guarantees that MDS processes the cap flush message before issuing the flushing caps to other client. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++---- fs/ceph/mds_client.c | 3 +++ fs/ceph/super.h | 7 +++++-- 3 files changed, 61 insertions(+), 6 deletions(-) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 420272788e01..69a16044ec41 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1486,6 +1486,7 @@ static int __mark_caps_flushing(struct inode *inode, cf = kmalloc(sizeof(*cf), GFP_ATOMIC); cf->caps = flushing; + cf->kick = false; spin_lock(&mdsc->cap_dirty_lock); list_del_init(&ci->i_dirty_item); @@ -2101,7 +2102,8 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, static int __kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, - struct ceph_inode_info *ci) + struct ceph_inode_info *ci, + bool kick_all) { struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; @@ -2127,7 +2129,9 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { cf = rb_entry(n, struct ceph_cap_flush, i_node); - if (cf->tid >= first_tid) + if (cf->tid < first_tid) + continue; + if (kick_all || cf->kick) break; } if (!n) { @@ -2136,6 +2140,8 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, } cf = rb_entry(n, struct ceph_cap_flush, i_node); + cf->kick = false; + first_tid = cf->tid + 1; dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, @@ -2149,6 +2155,49 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, return delayed; } +void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + struct ceph_inode_info *ci; + struct ceph_cap *cap; + struct ceph_cap_flush *cf; + struct rb_node *n; + + dout("early_kick_flushing_caps mds%d\n", session->s_mds); + list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { + spin_lock(&ci->i_ceph_lock); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", + &ci->vfs_inode, cap, session->s_mds); + spin_unlock(&ci->i_ceph_lock); + continue; + } + + + /* + * if flushing caps were revoked, we re-send the cap flush + * in client reconnect stage. This guarantees MDS * processes + * the cap flush message before issuing the flushing caps to + * other client. + */ + if ((cap->issued & ci->i_flushing_caps) != + ci->i_flushing_caps) { + spin_unlock(&ci->i_ceph_lock); + if (!__kick_flushing_caps(mdsc, session, ci, true)) + continue; + spin_lock(&ci->i_ceph_lock); + } + + for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + cf->kick = true; + } + + spin_unlock(&ci->i_ceph_lock); + } +} + void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { @@ -2158,7 +2207,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, dout("kick_flushing_caps mds%d\n", session->s_mds); list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { - int delayed = __kick_flushing_caps(mdsc, session, ci); + int delayed = __kick_flushing_caps(mdsc, session, ci, false); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); @@ -2191,7 +2240,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, spin_unlock(&ci->i_ceph_lock); - delayed = __kick_flushing_caps(mdsc, session, ci); + delayed = __kick_flushing_caps(mdsc, session, ci, true); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 31f6a78caa0a..89e4305a94d4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2982,6 +2982,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, reply->hdr.data_len = cpu_to_le32(pagelist->length); ceph_msg_data_add_pagelist(reply, pagelist); + + ceph_early_kick_flushing_caps(mdsc, session); + ceph_con_send(&session->s_con, reply); mutex_unlock(&session->s_mutex); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 94d91471165f..e7f13f742357 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -189,9 +189,10 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) struct ceph_cap_flush { u64 tid; int caps; - struct rb_node g_node; + bool kick; + struct rb_node g_node; // global union { - struct rb_node i_node; + struct rb_node i_node; // inode struct list_head list; }; }; @@ -868,6 +869,8 @@ extern void ceph_queue_caps_release(struct inode *inode); extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); extern int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync); +extern void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session); extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, -- 2.11.0