OSDN Git Service

tipc: refactor function filter_rcv()
authorJon Maloy <jon.maloy@ericsson.com>
Fri, 13 Oct 2017 09:04:20 +0000 (11:04 +0200)
committerDavid S. Miller <davem@davemloft.net>
Fri, 13 Oct 2017 15:46:00 +0000 (08:46 -0700)
In the following commits we will need to handle multiple incoming and
rejected/returned buffers in the function socket.c::filter_rcv().
As a preparation for this, we generalize the function by handling
buffer queues instead of individual buffers. We also introduce a
help function tipc_skb_reject(), and rename filter_rcv() to
tipc_sk_filter_rcv() in line with other functions in socket.c.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/msg.c
net/tipc/msg.h
net/tipc/socket.c

index 17146c1..1649d45 100644 (file)
@@ -666,3 +666,10 @@ void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
        }
        kfree_skb(skb);
 }
+
+void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
+                    struct sk_buff_head *xmitq)
+{
+       if (tipc_msg_reverse(tipc_own_addr(net), &skb, err))
+               __skb_queue_tail(xmitq, skb);
+}
index d058b1c..be3e38a 100644 (file)
@@ -819,6 +819,8 @@ static inline bool msg_is_reset(struct tipc_msg *hdr)
 struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp);
 bool tipc_msg_validate(struct sk_buff *skb);
 bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err);
+void tipc_skb_reject(struct net *net, int err, struct sk_buff *skb,
+                    struct sk_buff_head *xmitq);
 void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
                   u32 hsize, u32 destnode);
 struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
index 7659e79..bc226f5 100644 (file)
@@ -111,7 +111,7 @@ struct tipc_sock {
        struct rcu_head rcu;
 };
 
-static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
+static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
 static void tipc_data_ready(struct sock *sk);
 static void tipc_write_space(struct sock *sk);
 static void tipc_sock_destruct(struct sock *sk);
@@ -453,7 +453,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
        msg_set_origport(msg, tsk->portid);
        setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
        sk->sk_shutdown = 0;
-       sk->sk_backlog_rcv = tipc_backlog_rcv;
+       sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
        sk->sk_data_ready = tipc_data_ready;
        sk->sk_write_space = tipc_write_space;
@@ -850,16 +850,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 }
 
 /**
- * tipc_sk_proto_rcv - receive a connection mng protocol message
+ * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
  * @skb: pointer to message buffer.
  */
-static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
-                             struct sk_buff_head *xmitq)
+static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
+                                  struct sk_buff_head *xmitq)
 {
-       struct sock *sk = &tsk->sk;
-       u32 onode = tsk_own_node(tsk);
        struct tipc_msg *hdr = buf_msg(skb);
+       u32 onode = tsk_own_node(tsk);
+       struct sock *sk = &tsk->sk;
        int mtyp = msg_type(hdr);
        bool conn_cong;
 
@@ -1536,14 +1536,41 @@ static void tipc_sock_destruct(struct sock *sk)
        __skb_queue_purge(&sk->sk_receive_queue);
 }
 
+static void tipc_sk_proto_rcv(struct sock *sk,
+                             struct sk_buff_head *inputq,
+                             struct sk_buff_head *xmitq)
+{
+       struct sk_buff *skb = __skb_dequeue(inputq);
+       struct tipc_sock *tsk = tipc_sk(sk);
+       struct tipc_msg *hdr = buf_msg(skb);
+
+       switch (msg_user(hdr)) {
+       case CONN_MANAGER:
+               tipc_sk_conn_proto_rcv(tsk, skb, xmitq);
+               return;
+       case SOCK_WAKEUP:
+               u32_del(&tsk->cong_links, msg_orignode(hdr));
+               tsk->cong_link_cnt--;
+               sk->sk_write_space(sk);
+               break;
+       case TOP_SRV:
+               tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
+               break;
+       default:
+               break;
+       }
+
+       kfree_skb(skb);
+}
+
 /**
- * filter_connect - Handle all incoming messages for a connection-based socket
+ * tipc_filter_connect - Handle incoming message for a connection-based socket
  * @tsk: TIPC socket
  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
  *
  * Returns true if everything ok, false otherwise
  */
-static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
+static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
 {
        struct sock *sk = &tsk->sk;
        struct net *net = sock_net(sk);
@@ -1657,7 +1684,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
 }
 
 /**
- * filter_rcv - validate incoming message
+ * tipc_sk_filter_rcv - validate incoming message
  * @sk: socket
  * @skb: pointer to message.
  *
@@ -1666,75 +1693,49 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
  *
  * Called with socket lock already taken
  *
- * Returns true if message was added to socket receive queue, otherwise false
  */
-static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
-                      struct sk_buff_head *xmitq)
+static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
+                              struct sk_buff_head *xmitq)
 {
+       bool sk_conn = !tipc_sk_type_connectionless(sk);
        struct tipc_sock *tsk = tipc_sk(sk);
        struct tipc_msg *hdr = buf_msg(skb);
-       unsigned int limit = rcvbuf_limit(sk, skb);
-       int err = TIPC_OK;
+       struct net *net = sock_net(sk);
+       struct sk_buff_head inputq;
+       int limit, err = TIPC_OK;
 
-       if (unlikely(!msg_isdata(hdr))) {
-               switch (msg_user(hdr)) {
-               case CONN_MANAGER:
-                       tipc_sk_proto_rcv(tsk, skb, xmitq);
-                       return false;
-               case SOCK_WAKEUP:
-                       u32_del(&tsk->cong_links, msg_orignode(hdr));
-                       tsk->cong_link_cnt--;
-                       sk->sk_write_space(sk);
-                       break;
-               case TOP_SRV:
-                       tipc_sk_top_evt(tsk, (void *)msg_data(hdr));
-                       break;
-               default:
-                       break;
-               }
-               kfree_skb(skb);
-               return false;
-       }
+       TIPC_SKB_CB(skb)->bytes_read = 0;
+       __skb_queue_head_init(&inputq);
+       __skb_queue_tail(&inputq, skb);
 
-       /* Drop if illegal message type */
-       if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
-               kfree_skb(skb);
-               return false;
-       }
+       if (unlikely(!msg_isdata(hdr)))
+               tipc_sk_proto_rcv(sk, &inputq, xmitq);
+       else if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG))
+               return kfree_skb(skb);
 
-       /* Reject if wrong message type for current socket state */
-       if (tipc_sk_type_connectionless(sk)) {
-               if (msg_connected(hdr)) {
+       /* Validate and add to receive buffer if there is space */
+       while ((skb = __skb_dequeue(&inputq))) {
+               hdr = buf_msg(skb);
+               limit = rcvbuf_limit(sk, skb);
+               if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
+                   (!sk_conn && msg_connected(hdr)))
                        err = TIPC_ERR_NO_PORT;
-                       goto reject;
-               }
-       } else if (unlikely(!filter_connect(tsk, skb))) {
-               err = TIPC_ERR_NO_PORT;
-               goto reject;
-       }
+               else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit)
+                       err = TIPC_ERR_OVERLOAD;
 
-       /* Reject message if there isn't room to queue it */
-       if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
-               err = TIPC_ERR_OVERLOAD;
-               goto reject;
+               if (unlikely(err)) {
+                       tipc_skb_reject(net, err, skb, xmitq);
+                       err = TIPC_OK;
+                       continue;
+               }
+               __skb_queue_tail(&sk->sk_receive_queue, skb);
+               skb_set_owner_r(skb, sk);
+               sk->sk_data_ready(sk);
        }
-
-       /* Enqueue message */
-       TIPC_SKB_CB(skb)->bytes_read = 0;
-       __skb_queue_tail(&sk->sk_receive_queue, skb);
-       skb_set_owner_r(skb, sk);
-
-       sk->sk_data_ready(sk);
-       return true;
-
-reject:
-       if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
-               __skb_queue_tail(xmitq, skb);
-       return false;
 }
 
 /**
- * tipc_backlog_rcv - handle incoming message from backlog queue
+ * tipc_sk_backlog_rcv - handle incoming message from backlog queue
  * @sk: socket
  * @skb: message
  *
@@ -1742,27 +1743,25 @@ reject:
  *
  * Returns 0
  */
-static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
+static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
-       unsigned int truesize = skb->truesize;
+       unsigned int before = sk_rmem_alloc_get(sk);
        struct sk_buff_head xmitq;
        u32 dnode, selector;
+       unsigned int added;
 
        __skb_queue_head_init(&xmitq);
 
-       if (likely(filter_rcv(sk, skb, &xmitq))) {
-               atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
-               return 0;
-       }
-
-       if (skb_queue_empty(&xmitq))
-               return 0;
+       tipc_sk_filter_rcv(sk, skb, &xmitq);
+       added = sk_rmem_alloc_get(sk) - before;
+       atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
 
-       /* Send response/rejected message */
-       skb = __skb_dequeue(&xmitq);
-       dnode = msg_destnode(buf_msg(skb));
-       selector = msg_origport(buf_msg(skb));
-       tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
+       /* Send pending response/rejected messages, if any */
+       while ((skb = __skb_dequeue(&xmitq))) {
+               selector = msg_origport(buf_msg(skb));
+               dnode = msg_destnode(buf_msg(skb));
+               tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
+       }
        return 0;
 }
 
@@ -1794,7 +1793,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
 
                /* Add message directly to receive queue if possible */
                if (!sock_owned_by_user(sk)) {
-                       filter_rcv(sk, skb, xmitq);
+                       tipc_sk_filter_rcv(sk, skb, xmitq);
                        continue;
                }