OSDN Git Service

rds: use list structure to track information for zerocopy completion notification
authorSowmini Varadhan <sowmini.varadhan@oracle.com>
Tue, 6 Mar 2018 15:22:34 +0000 (07:22 -0800)
committerDavid S. Miller <davem@davemloft.net>
Wed, 7 Mar 2018 23:05:57 +0000 (18:05 -0500)
Commit 401910db4cd4 ("rds: deliver zerocopy completion notification
with data") removes support fo r zerocopy completion notification
on the sk_error_queue, thus we no longer need to track the cookie
information in sk_buff structures.

This commit removes the struct sk_buff_head rs_zcookie_queue by
a simpler list that results in a smaller memory footprint as well
as more efficient memory_allocation time.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Acked-by: Willem de Bruijn <willemb@google.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/rds/af_rds.c
net/rds/message.c
net/rds/rds.h
net/rds/recv.c

index f712610..ab751a1 100644 (file)
@@ -77,7 +77,7 @@ static int rds_release(struct socket *sock)
        rds_send_drop_to(rs, NULL);
        rds_rdma_drop_keys(rs);
        rds_notify_queue_get(rs, NULL);
-       __skb_queue_purge(&rs->rs_zcookie_queue);
+       rds_notify_msg_zcopy_purge(&rs->rs_zcookie_queue);
 
        spin_lock_bh(&rds_sock_lock);
        list_del_init(&rs->rs_item);
@@ -180,7 +180,7 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
        }
        if (!list_empty(&rs->rs_recv_queue) ||
            !list_empty(&rs->rs_notify_queue) ||
-           !skb_queue_empty(&rs->rs_zcookie_queue))
+           !list_empty(&rs->rs_zcookie_queue.zcookie_head))
                mask |= (EPOLLIN | EPOLLRDNORM);
        if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
                mask |= (EPOLLOUT | EPOLLWRNORM);
@@ -515,7 +515,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
        INIT_LIST_HEAD(&rs->rs_recv_queue);
        INIT_LIST_HEAD(&rs->rs_notify_queue);
        INIT_LIST_HEAD(&rs->rs_cong_list);
-       skb_queue_head_init(&rs->rs_zcookie_queue);
+       rds_message_zcopy_queue_init(&rs->rs_zcookie_queue);
        spin_lock_init(&rs->rs_rdma_lock);
        rs->rs_rdma_keys = RB_ROOT;
        rs->rs_rx_traces = 0;
index c36edbb..90dcdcf 100644 (file)
@@ -48,7 +48,6 @@ static unsigned int   rds_exthdr_size[__RDS_EXTHDR_MAX] = {
 [RDS_EXTHDR_GEN_NUM]   = sizeof(u32),
 };
 
-
 void rds_message_addref(struct rds_message *rm)
 {
        rdsdebug("addref rm %p ref %d\n", rm, refcount_read(&rm->m_refcount));
@@ -56,9 +55,9 @@ void rds_message_addref(struct rds_message *rm)
 }
 EXPORT_SYMBOL_GPL(rds_message_addref);
 
-static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
+static inline bool rds_zcookie_add(struct rds_msg_zcopy_info *info, u32 cookie)
 {
-       struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;
+       struct rds_zcopy_cookies *ck = &info->zcookies;
        int ncookies = ck->num;
 
        if (ncookies == RDS_MAX_ZCOOKIES)
@@ -68,38 +67,61 @@ static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
        return true;
 }
 
+struct rds_msg_zcopy_info *rds_info_from_znotifier(struct rds_znotifier *znotif)
+{
+       return container_of(znotif, struct rds_msg_zcopy_info, znotif);
+}
+
+void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *q)
+{
+       unsigned long flags;
+       LIST_HEAD(copy);
+       struct rds_msg_zcopy_info *info, *tmp;
+
+       spin_lock_irqsave(&q->lock, flags);
+       list_splice(&q->zcookie_head, &copy);
+       INIT_LIST_HEAD(&q->zcookie_head);
+       spin_unlock_irqrestore(&q->lock, flags);
+
+       list_for_each_entry_safe(info, tmp, &copy, rs_zcookie_next) {
+               list_del(&info->rs_zcookie_next);
+               kfree(info);
+       }
+}
+
 static void rds_rm_zerocopy_callback(struct rds_sock *rs,
                                     struct rds_znotifier *znotif)
 {
-       struct sk_buff *skb, *tail;
-       unsigned long flags;
-       struct sk_buff_head *q;
+       struct rds_msg_zcopy_info *info;
+       struct rds_msg_zcopy_queue *q;
        u32 cookie = znotif->z_cookie;
        struct rds_zcopy_cookies *ck;
+       struct list_head *head;
+       unsigned long flags;
 
+       mm_unaccount_pinned_pages(&znotif->z_mmp);
        q = &rs->rs_zcookie_queue;
        spin_lock_irqsave(&q->lock, flags);
-       tail = skb_peek_tail(q);
-
-       if (tail && skb_zcookie_add(tail, cookie)) {
-               spin_unlock_irqrestore(&q->lock, flags);
-               mm_unaccount_pinned_pages(&znotif->z_mmp);
-               consume_skb(rds_skb_from_znotifier(znotif));
-               /* caller invokes rds_wake_sk_sleep() */
-               return;
+       head = &q->zcookie_head;
+       if (!list_empty(head)) {
+               info = list_entry(head, struct rds_msg_zcopy_info,
+                                 rs_zcookie_next);
+               if (info && rds_zcookie_add(info, cookie)) {
+                       spin_unlock_irqrestore(&q->lock, flags);
+                       kfree(rds_info_from_znotifier(znotif));
+                       /* caller invokes rds_wake_sk_sleep() */
+                       return;
+               }
        }
 
-       skb = rds_skb_from_znotifier(znotif);
-       ck = (struct rds_zcopy_cookies *)skb->cb;
+       info = rds_info_from_znotifier(znotif);
+       ck = &info->zcookies;
        memset(ck, 0, sizeof(*ck));
-       WARN_ON(!skb_zcookie_add(skb, cookie));
-
-       __skb_queue_tail(q, skb);
+       WARN_ON(!rds_zcookie_add(info, cookie));
+       list_add_tail(&q->zcookie_head, &info->rs_zcookie_next);
 
        spin_unlock_irqrestore(&q->lock, flags);
        /* caller invokes rds_wake_sk_sleep() */
-
-       mm_unaccount_pinned_pages(&znotif->z_mmp);
 }
 
 /*
@@ -340,7 +362,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
        int ret = 0;
        int length = iov_iter_count(from);
        int total_copied = 0;
-       struct sk_buff *skb;
+       struct rds_msg_zcopy_info *info;
 
        rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
 
@@ -350,12 +372,11 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
        sg = rm->data.op_sg;
        sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
 
-       skb = alloc_skb(0, GFP_KERNEL);
-       if (!skb)
+       info = kzalloc(sizeof(*info), GFP_KERNEL);
+       if (!info)
                return -ENOMEM;
-       BUILD_BUG_ON(sizeof(skb->cb) < max_t(int, sizeof(struct rds_znotifier),
-                                            sizeof(struct rds_zcopy_cookies)));
-       rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
+       INIT_LIST_HEAD(&info->rs_zcookie_next);
+       rm->data.op_mmp_znotifier = &info->znotif;
        if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
                                    length)) {
                ret = -ENOMEM;
@@ -389,7 +410,7 @@ int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
        WARN_ON_ONCE(length != 0);
        return ret;
 err:
-       consume_skb(skb);
+       kfree(info);
        rm->data.op_mmp_znotifier = NULL;
        return ret;
 }
index 33b1635..74cd27c 100644 (file)
@@ -357,16 +357,27 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
 #define RDS_MSG_FLUSH          8
 
 struct rds_znotifier {
-       struct list_head        z_list;
        struct mmpin            z_mmp;
        u32                     z_cookie;
 };
 
-#define        RDS_ZCOPY_SKB(__skb)    ((struct rds_znotifier *)&((__skb)->cb[0]))
+struct rds_msg_zcopy_info {
+       struct list_head rs_zcookie_next;
+       union {
+               struct rds_znotifier znotif;
+               struct rds_zcopy_cookies zcookies;
+       };
+};
 
-static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
+struct rds_msg_zcopy_queue {
+       struct list_head zcookie_head;
+       spinlock_t lock; /* protects zcookie_head queue */
+};
+
+static inline void rds_message_zcopy_queue_init(struct rds_msg_zcopy_queue *q)
 {
-       return container_of((void *)z, struct sk_buff, cb);
+       spin_lock_init(&q->lock);
+       INIT_LIST_HEAD(&q->zcookie_head);
 }
 
 struct rds_message {
@@ -603,8 +614,7 @@ struct rds_sock {
        /* Socket receive path trace points*/
        u8                      rs_rx_traces;
        u8                      rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
-
-       struct sk_buff_head     rs_zcookie_queue;
+       struct rds_msg_zcopy_queue rs_zcookie_queue;
 };
 
 static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
@@ -803,6 +813,7 @@ void rds_message_addref(struct rds_message *rm);
 void rds_message_put(struct rds_message *rm);
 void rds_message_wait(struct rds_message *rm);
 void rds_message_unmapped(struct rds_message *rm);
+void rds_notify_msg_zcopy_purge(struct rds_msg_zcopy_queue *info);
 
 static inline void rds_message_make_checksum(struct rds_header *hdr)
 {
index d507477..de50e21 100644 (file)
@@ -579,9 +579,10 @@ out:
 
 static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
 {
-       struct sk_buff *skb;
-       struct sk_buff_head *q = &rs->rs_zcookie_queue;
+       struct rds_msg_zcopy_queue *q = &rs->rs_zcookie_queue;
+       struct rds_msg_zcopy_info *info = NULL;
        struct rds_zcopy_cookies *done;
+       unsigned long flags;
 
        if (!msg->msg_control)
                return false;
@@ -590,16 +591,24 @@ static bool rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
            msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
                return false;
 
-       skb = skb_dequeue(q);
-       if (!skb)
+       spin_lock_irqsave(&q->lock, flags);
+       if (!list_empty(&q->zcookie_head)) {
+               info = list_entry(q->zcookie_head.next,
+                                 struct rds_msg_zcopy_info, rs_zcookie_next);
+               list_del(&info->rs_zcookie_next);
+       }
+       spin_unlock_irqrestore(&q->lock, flags);
+       if (!info)
                return false;
-       done = (struct rds_zcopy_cookies *)skb->cb;
+       done = &info->zcookies;
        if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
                     done)) {
-               skb_queue_head(q, skb);
+               spin_lock_irqsave(&q->lock, flags);
+               list_add(&info->rs_zcookie_next, &q->zcookie_head);
+               spin_unlock_irqrestore(&q->lock, flags);
                return false;
        }
-       consume_skb(skb);
+       kfree(info);
        return true;
 }