OSDN Git Service

tipc: make macro tipc_wait_for_cond() smp safe
[uclinux-h8/linux.git] / net / tipc / socket.c
index 800caaa..1b92b72 100644 (file)
@@ -35,6 +35,8 @@
  */
 
 #include <linux/rhashtable.h>
+#include <linux/sched/signal.h>
+
 #include "core.h"
 #include "name_table.h"
 #include "node.h"
@@ -49,6 +51,7 @@
 #define TIPC_FWD_MSG           1
 #define TIPC_MAX_PORT          0xffffffff
 #define TIPC_MIN_PORT          1
+#define TIPC_ACK_RATE          4       /* ACK at 1/4 of of rcv window size */
 
 enum {
        TIPC_LISTEN = TCP_LISTEN,
@@ -67,16 +70,19 @@ enum {
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
  * @portid: unique port identity in TIPC socket hash table
  * @phdr: preformatted message header used when sending messages
+ * #cong_links: list of congested links
  * @publications: list of publications for port
+ * @blocking_link: address of the congested link we are currently sleeping on
  * @pub_count: total # of publications port has made during its lifetime
  * @probing_state:
  * @conn_timeout: the time we can wait for an unresponded setup request
  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
- * @link_cong: non-zero if owner must sleep because of link congestion
+ * @cong_link_cnt: number of congested links
  * @sent_unacked: # messages sent by socket, and not yet acked by peer
  * @rcv_unacked: # messages read by user, but not yet acked back to peer
  * @peer: 'connected' peer for dgram/rdm
  * @node: hash table node
+ * @mc_method: cookie for use between socket and broadcast layer
  * @rcu: rcu struct for tipc_sock
  */
 struct tipc_sock {
@@ -87,13 +93,13 @@ struct tipc_sock {
        u32 max_pkt;
        u32 portid;
        struct tipc_msg phdr;
-       struct list_head sock_list;
+       struct list_head cong_links;
        struct list_head publications;
        u32 pub_count;
        uint conn_timeout;
        atomic_t dupl_rcvcnt;
        bool probe_unacked;
-       bool link_cong;
+       u16 cong_link_cnt;
        u16 snt_unacked;
        u16 snd_win;
        u16 peer_caps;
@@ -101,6 +107,7 @@ struct tipc_sock {
        u16 rcv_win;
        struct sockaddr_tipc peer;
        struct rhash_head node;
+       struct tipc_mc_method mc_method;
        struct rcu_head rcu;
 };
 
@@ -109,8 +116,8 @@ static void tipc_data_ready(struct sock *sk);
 static void tipc_write_space(struct sock *sk);
 static void tipc_sock_destruct(struct sock *sk);
 static int tipc_release(struct socket *sock);
-static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
-static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
+static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
+                      bool kern);
 static void tipc_sk_timeout(unsigned long data);
 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
                           struct tipc_name_seq const *seq);
@@ -119,8 +126,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
 static int tipc_sk_insert(struct tipc_sock *tsk);
 static void tipc_sk_remove(struct tipc_sock *tsk);
-static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
-                             size_t dsz);
+static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
 
 static const struct proto_ops packet_ops;
@@ -334,6 +340,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)
        return res;
 }
 
+static int tipc_sk_sock_err(struct socket *sock, long *timeout)
+{
+       struct sock *sk = sock->sk;
+       int err = sock_error(sk);
+       int typ = sock->type;
+
+       if (err)
+               return err;
+       if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
+               if (sk->sk_state == TIPC_DISCONNECTING)
+                       return -EPIPE;
+               else if (!tipc_sk_connected(sk))
+                       return -ENOTCONN;
+       }
+       if (!*timeout)
+               return -EAGAIN;
+       if (signal_pending(current))
+               return sock_intr_errno(*timeout);
+
+       return 0;
+}
+
+#define tipc_wait_for_cond(sock_, timeo_, condition_)                         \
+({                                                                             \
+       struct sock *sk_;                                                      \
+       int rc_;                                                               \
+                                                                              \
+       while ((rc_ = !(condition_))) {                                        \
+               DEFINE_WAIT_FUNC(wait_, woken_wake_function);                  \
+               sk_ = (sock_)->sk;                                             \
+               rc_ = tipc_sk_sock_err((sock_), timeo_);                       \
+               if (rc_)                                                       \
+                       break;                                                 \
+               prepare_to_wait(sk_sleep(sk_), &wait_, TASK_INTERRUPTIBLE);    \
+               release_sock(sk_);                                             \
+               *(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
+               sched_annotate_sleep();                                        \
+               lock_sock(sk_);                                                \
+               remove_wait_queue(sk_sleep(sk_), &wait_);                      \
+       }                                                                      \
+       rc_;                                                                   \
+})
+
 /**
  * tipc_sk_create - create a TIPC socket
  * @net: network namespace (must be default network)
@@ -382,10 +431,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
        tsk = tipc_sk(sk);
        tsk->max_pkt = MAX_PKT_DEFAULT;
        INIT_LIST_HEAD(&tsk->publications);
+       INIT_LIST_HEAD(&tsk->cong_links);
        msg = &tsk->phdr;
        tn = net_generic(sock_net(sk), tipc_net_id);
-       tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
-                     NAMED_H_SIZE, 0);
 
        /* Finish initializing socket data structures */
        sock->ops = ops;
@@ -395,6 +443,13 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
                pr_warn("Socket create failed; port number exhausted\n");
                return -EINVAL;
        }
+
+       /* Ensure tsk is visible before we read own_addr. */
+       smp_mb();
+
+       tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
+                     NAMED_H_SIZE, 0);
+
        msg_set_origport(msg, tsk->portid);
        setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
        sk->sk_shutdown = 0;
@@ -432,9 +487,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
        struct net *net = sock_net(sk);
+       long timeout = CONN_TIMEOUT_DEFAULT;
        u32 dnode = tsk_peer_node(tsk);
        struct sk_buff *skb;
 
+       /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
+       tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
+                                           !tsk_conn_cong(tsk)));
+
        /* Reject all unreceived messages, except on an active connection
         * (which disconnects locally & sends a 'FIN+' to peer).
         */
@@ -505,7 +565,8 @@ static int tipc_release(struct socket *sock)
 
        /* Reject any messages that accumulated in backlog queue */
        release_sock(sk);
-
+       u32_list_purge(&tsk->cong_links);
+       tsk->cong_link_cnt = 0;
        call_rcu(&tsk->rcu, tipc_sk_callback);
        sock->sk = NULL;
 
@@ -648,7 +709,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
 
        switch (sk->sk_state) {
        case TIPC_ESTABLISHED:
-               if (!tsk->link_cong && !tsk_conn_cong(tsk))
+               if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
                        mask |= POLLOUT;
                /* fall thru' */
        case TIPC_LISTEN:
@@ -657,7 +718,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
                        mask |= (POLLIN | POLLRDNORM);
                break;
        case TIPC_OPEN:
-               if (!tsk->link_cong)
+               if (!tsk->cong_link_cnt)
                        mask |= POLLOUT;
                if (tipc_sk_type_connectionless(sk) &&
                    (!skb_queue_empty(&sk->sk_receive_queue)))
@@ -676,63 +737,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
  * @sock: socket structure
  * @seq: destination address
  * @msg: message to send
- * @dsz: total length of message data
- * @timeo: timeout to wait for wakeup
+ * @dlen: length of data to send
+ * @timeout: timeout to wait for wakeup
  *
  * Called from function tipc_sendmsg(), which has done all sanity checks
  * Returns the number of bytes sent on success, or errno
  */
 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
-                         struct msghdr *msg, size_t dsz, long timeo)
+                         struct msghdr *msg, size_t dlen, long timeout)
 {
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
+       struct tipc_msg *hdr = &tsk->phdr;
        struct net *net = sock_net(sk);
-       struct tipc_msg *mhdr = &tsk->phdr;
-       struct sk_buff_head pktchain;
-       struct iov_iter save = msg->msg_iter;
-       uint mtu;
+       int mtu = tipc_bcast_get_mtu(net);
+       struct tipc_mc_method *method = &tsk->mc_method;
+       u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
+       struct sk_buff_head pkts;
+       struct tipc_nlist dsts;
        int rc;
 
-       if (!timeo && tsk->link_cong)
-               return -ELINKCONG;
+       /* Block or return if any destination link is congested */
+       rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
+       if (unlikely(rc))
+               return rc;
 
-       msg_set_type(mhdr, TIPC_MCAST_MSG);
-       msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
-       msg_set_destport(mhdr, 0);
-       msg_set_destnode(mhdr, 0);
-       msg_set_nametype(mhdr, seq->type);
-       msg_set_namelower(mhdr, seq->lower);
-       msg_set_nameupper(mhdr, seq->upper);
-       msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
+       /* Lookup destination nodes */
+       tipc_nlist_init(&dsts, tipc_own_addr(net));
+       tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
+                                     seq->upper, domain, &dsts);
+       if (!dsts.local && !dsts.remote)
+               return -EHOSTUNREACH;
 
-       skb_queue_head_init(&pktchain);
+       /* Build message header */
+       msg_set_type(hdr, TIPC_MCAST_MSG);
+       msg_set_hdr_sz(hdr, MCAST_H_SIZE);
+       msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
+       msg_set_destport(hdr, 0);
+       msg_set_destnode(hdr, 0);
+       msg_set_nametype(hdr, seq->type);
+       msg_set_namelower(hdr, seq->lower);
+       msg_set_nameupper(hdr, seq->upper);
 
-new_mtu:
-       mtu = tipc_bcast_get_mtu(net);
-       rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
-       if (unlikely(rc < 0))
-               return rc;
+       /* Build message as chain of buffers */
+       skb_queue_head_init(&pkts);
+       rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 
-       do {
-               rc = tipc_bcast_xmit(net, &pktchain);
-               if (likely(!rc))
-                       return dsz;
-
-               if (rc == -ELINKCONG) {
-                       tsk->link_cong = 1;
-                       rc = tipc_wait_for_sndmsg(sock, &timeo);
-                       if (!rc)
-                               continue;
-               }
-               __skb_queue_purge(&pktchain);
-               if (rc == -EMSGSIZE) {
-                       msg->msg_iter = save;
-                       goto new_mtu;
-               }
-               break;
-       } while (1);
-       return rc;
+       /* Send message if build was successful */
+       if (unlikely(rc == dlen))
+               rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
+                                    &tsk->cong_link_cnt);
+
+       tipc_nlist_purge(&dsts);
+
+       return rc ? rc : dlen;
 }
 
 /**
@@ -746,7 +804,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
                       struct sk_buff_head *inputq)
 {
        struct tipc_msg *msg;
-       struct tipc_plist dports;
+       struct list_head dports;
        u32 portid;
        u32 scope = TIPC_CLUSTER_SCOPE;
        struct sk_buff_head tmpq;
@@ -754,7 +812,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
        struct sk_buff *skb, *_skb;
 
        __skb_queue_head_init(&tmpq);
-       tipc_plist_init(&dports);
+       INIT_LIST_HEAD(&dports);
 
        skb = tipc_skb_peek(arrvq, &inputq->lock);
        for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
@@ -768,8 +826,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
                tipc_nametbl_mc_translate(net,
                                          msg_nametype(msg), msg_namelower(msg),
                                          msg_nameupper(msg), scope, &dports);
-               portid = tipc_plist_pop(&dports);
-               for (; portid; portid = tipc_plist_pop(&dports)) {
+               portid = u32_pop(&dports);
+               for (; portid; portid = u32_pop(&dports)) {
                        _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
                        if (_skb) {
                                msg_set_destport(buf_msg(_skb), portid);
@@ -809,6 +867,14 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
        if (!tsk_peer_msg(tsk, hdr))
                goto exit;
 
+       if (unlikely(msg_errcode(hdr))) {
+               tipc_set_sk_state(sk, TIPC_DISCONNECTING);
+               tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
+                                     tsk_peer_port(tsk));
+               sk->sk_state_change(sk);
+               goto exit;
+       }
+
        tsk->probe_unacked = false;
 
        if (mtyp == CONN_PROBE) {
@@ -830,31 +896,6 @@ exit:
        kfree_skb(skb);
 }
 
-static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
-{
-       DEFINE_WAIT_FUNC(wait, woken_wake_function);
-       struct sock *sk = sock->sk;
-       struct tipc_sock *tsk = tipc_sk(sk);
-       int done;
-
-       do {
-               int err = sock_error(sk);
-               if (err)
-                       return err;
-               if (sk->sk_shutdown & SEND_SHUTDOWN)
-                       return -EPIPE;
-               if (!*timeo_p)
-                       return -EAGAIN;
-               if (signal_pending(current))
-                       return sock_intr_errno(*timeo_p);
-
-               add_wait_queue(sk_sleep(sk), &wait);
-               done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait);
-               remove_wait_queue(sk_sleep(sk), &wait);
-       } while (!done);
-       return 0;
-}
-
 /**
  * tipc_sendmsg - send message in connectionless manner
  * @sock: socket structure
@@ -881,35 +922,38 @@ static int tipc_sendmsg(struct socket *sock,
        return ret;
 }
 
-static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
+static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
 {
-       DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
        struct sock *sk = sock->sk;
-       struct tipc_sock *tsk = tipc_sk(sk);
        struct net *net = sock_net(sk);
-       struct tipc_msg *mhdr = &tsk->phdr;
-       u32 dnode, dport;
-       struct sk_buff_head pktchain;
-       bool is_connectionless = tipc_sk_type_connectionless(sk);
-       struct sk_buff *skb;
+       struct tipc_sock *tsk = tipc_sk(sk);
+       DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
+       long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+       struct list_head *clinks = &tsk->cong_links;
+       bool syn = !tipc_sk_type_connectionless(sk);
+       struct tipc_msg *hdr = &tsk->phdr;
        struct tipc_name_seq *seq;
-       struct iov_iter save;
-       u32 mtu;
-       long timeo;
-       int rc;
+       struct sk_buff_head pkts;
+       u32 type, inst, domain;
+       u32 dnode, dport;
+       int mtu, rc;
 
-       if (dsz > TIPC_MAX_USER_MSG_SIZE)
+       if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
                return -EMSGSIZE;
+
        if (unlikely(!dest)) {
-               if (is_connectionless && tsk->peer.family == AF_TIPC)
-                       dest = &tsk->peer;
-               else
+               dest = &tsk->peer;
+               if (!syn || dest->family != AF_TIPC)
                        return -EDESTADDRREQ;
-       } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
-                  dest->family != AF_TIPC) {
-               return -EINVAL;
        }
-       if (!is_connectionless) {
+
+       if (unlikely(m->msg_namelen < sizeof(*dest)))
+               return -EINVAL;
+
+       if (unlikely(dest->family != AF_TIPC))
+               return -EINVAL;
+
+       if (unlikely(syn)) {
                if (sk->sk_state == TIPC_LISTEN)
                        return -EPIPE;
                if (sk->sk_state != TIPC_OPEN)
@@ -921,102 +965,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
                        tsk->conn_instance = dest->addr.name.name.instance;
                }
        }
-       seq = &dest->addr.nameseq;
-       timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 
-       if (dest->addrtype == TIPC_ADDR_MCAST) {
-               return tipc_sendmcast(sock, seq, m, dsz, timeo);
-       } else if (dest->addrtype == TIPC_ADDR_NAME) {
-               u32 type = dest->addr.name.name.type;
-               u32 inst = dest->addr.name.name.instance;
-               u32 domain = dest->addr.name.domain;
+       seq = &dest->addr.nameseq;
+       if (dest->addrtype == TIPC_ADDR_MCAST)
+               return tipc_sendmcast(sock, seq, m, dlen, timeout);
 
+       if (dest->addrtype == TIPC_ADDR_NAME) {
+               type = dest->addr.name.name.type;
+               inst = dest->addr.name.name.instance;
+               domain = dest->addr.name.domain;
                dnode = domain;
-               msg_set_type(mhdr, TIPC_NAMED_MSG);
-               msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
-               msg_set_nametype(mhdr, type);
-               msg_set_nameinst(mhdr, inst);
-               msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
+               msg_set_type(hdr, TIPC_NAMED_MSG);
+               msg_set_hdr_sz(hdr, NAMED_H_SIZE);
+               msg_set_nametype(hdr, type);
+               msg_set_nameinst(hdr, inst);
+               msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
                dport = tipc_nametbl_translate(net, type, inst, &dnode);
-               msg_set_destnode(mhdr, dnode);
-               msg_set_destport(mhdr, dport);
+               msg_set_destnode(hdr, dnode);
+               msg_set_destport(hdr, dport);
                if (unlikely(!dport && !dnode))
                        return -EHOSTUNREACH;
+
        } else if (dest->addrtype == TIPC_ADDR_ID) {
                dnode = dest->addr.id.node;
-               msg_set_type(mhdr, TIPC_DIRECT_MSG);
-               msg_set_lookup_scope(mhdr, 0);
-               msg_set_destnode(mhdr, dnode);
-               msg_set_destport(mhdr, dest->addr.id.ref);
-               msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
+               msg_set_type(hdr, TIPC_DIRECT_MSG);
+               msg_set_lookup_scope(hdr, 0);
+               msg_set_destnode(hdr, dnode);
+               msg_set_destport(hdr, dest->addr.id.ref);
+               msg_set_hdr_sz(hdr, BASIC_H_SIZE);
        }
 
-       skb_queue_head_init(&pktchain);
-       save = m->msg_iter;
-new_mtu:
-       mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
-       rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
-       if (rc < 0)
+       /* Block or return if destination link is congested */
+       rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
+       if (unlikely(rc))
                return rc;
 
-       do {
-               skb = skb_peek(&pktchain);
-               TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-               rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
-               if (likely(!rc)) {
-                       if (!is_connectionless)
-                               tipc_set_sk_state(sk, TIPC_CONNECTING);
-                       return dsz;
-               }
-               if (rc == -ELINKCONG) {
-                       tsk->link_cong = 1;
-                       rc = tipc_wait_for_sndmsg(sock, &timeo);
-                       if (!rc)
-                               continue;
-               }
-               __skb_queue_purge(&pktchain);
-               if (rc == -EMSGSIZE) {
-                       m->msg_iter = save;
-                       goto new_mtu;
-               }
-               break;
-       } while (1);
+       skb_queue_head_init(&pkts);
+       mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+       rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
+       if (unlikely(rc != dlen))
+               return rc;
 
-       return rc;
-}
+       rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+       if (unlikely(rc == -ELINKCONG)) {
+               u32_push(clinks, dnode);
+               tsk->cong_link_cnt++;
+               rc = 0;
+       }
 
-static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
-{
-       DEFINE_WAIT_FUNC(wait, woken_wake_function);
-       struct sock *sk = sock->sk;
-       struct tipc_sock *tsk = tipc_sk(sk);
-       int done;
+       if (unlikely(syn && !rc))
+               tipc_set_sk_state(sk, TIPC_CONNECTING);
 
-       do {
-               int err = sock_error(sk);
-               if (err)
-                       return err;
-               if (sk->sk_state == TIPC_DISCONNECTING)
-                       return -EPIPE;
-               else if (!tipc_sk_connected(sk))
-                       return -ENOTCONN;
-               if (!*timeo_p)
-                       return -EAGAIN;
-               if (signal_pending(current))
-                       return sock_intr_errno(*timeo_p);
-
-               add_wait_queue(sk_sleep(sk), &wait);
-               done = sk_wait_event(sk, timeo_p,
-                                    (!tsk->link_cong &&
-                                     !tsk_conn_cong(tsk)) ||
-                                     !tipc_sk_connected(sk), &wait);
-               remove_wait_queue(sk_sleep(sk), &wait);
-       } while (!done);
-       return 0;
+       return rc ? rc : dlen;
 }
 
 /**
- * tipc_send_stream - send stream-oriented data
+ * tipc_sendstream - send stream-oriented data
  * @sock: socket structure
  * @m: data to send
  * @dsz: total length of data to be transmitted
@@ -1026,93 +1030,68 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
  * Returns the number of bytes sent on success (or partial success),
  * or errno if no data sent
  */
-static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
+static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
 {
        struct sock *sk = sock->sk;
        int ret;
 
        lock_sock(sk);
-       ret = __tipc_send_stream(sock, m, dsz);
+       ret = __tipc_sendstream(sock, m, dsz);
        release_sock(sk);
 
        return ret;
 }
 
-static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
+static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
 {
        struct sock *sk = sock->sk;
-       struct net *net = sock_net(sk);
-       struct tipc_sock *tsk = tipc_sk(sk);
-       struct tipc_msg *mhdr = &tsk->phdr;
-       struct sk_buff_head pktchain;
        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-       u32 portid = tsk->portid;
-       int rc = -EINVAL;
-       long timeo;
-       u32 dnode;
-       uint mtu, send, sent = 0;
-       struct iov_iter save;
-       int hlen = MIN_H_SIZE;
-
-       /* Handle implied connection establishment */
-       if (unlikely(dest)) {
-               rc = __tipc_sendmsg(sock, m, dsz);
-               hlen = msg_hdr_sz(mhdr);
-               if (dsz && (dsz == rc))
-                       tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
-               return rc;
-       }
-       if (dsz > (uint)INT_MAX)
-               return -EMSGSIZE;
-
-       if (unlikely(!tipc_sk_connected(sk))) {
-               if (sk->sk_state == TIPC_DISCONNECTING)
-                       return -EPIPE;
-               else
-                       return -ENOTCONN;
-       }
+       long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+       struct tipc_sock *tsk = tipc_sk(sk);
+       struct tipc_msg *hdr = &tsk->phdr;
+       struct net *net = sock_net(sk);
+       struct sk_buff_head pkts;
+       u32 dnode = tsk_peer_node(tsk);
+       int send, sent = 0;
+       int rc = 0;
 
-       timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
-       if (!timeo && tsk->link_cong)
-               return -ELINKCONG;
+       skb_queue_head_init(&pkts);
 
-       dnode = tsk_peer_node(tsk);
-       skb_queue_head_init(&pktchain);
+       if (unlikely(dlen > INT_MAX))
+               return -EMSGSIZE;
 
-next:
-       save = m->msg_iter;
-       mtu = tsk->max_pkt;
-       send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
-       rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
-       if (unlikely(rc < 0))
+       /* Handle implicit connection setup */
+       if (unlikely(dest)) {
+               rc = __tipc_sendmsg(sock, m, dlen);
+               if (dlen && (dlen == rc))
+                       tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
                return rc;
+       }
 
        do {
-               if (likely(!tsk_conn_cong(tsk))) {
-                       rc = tipc_node_xmit(net, &pktchain, dnode, portid);
-                       if (likely(!rc)) {
-                               tsk->snt_unacked += tsk_inc(tsk, send + hlen);
-                               sent += send;
-                               if (sent == dsz)
-                                       return dsz;
-                               goto next;
-                       }
-                       if (rc == -EMSGSIZE) {
-                               __skb_queue_purge(&pktchain);
-                               tsk->max_pkt = tipc_node_get_mtu(net, dnode,
-                                                                portid);
-                               m->msg_iter = save;
-                               goto next;
-                       }
-                       if (rc != -ELINKCONG)
-                               break;
+               rc = tipc_wait_for_cond(sock, &timeout,
+                                       (!tsk->cong_link_cnt &&
+                                        !tsk_conn_cong(tsk) &&
+                                        tipc_sk_connected(sk)));
+               if (unlikely(rc))
+                       break;
+
+               send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
+               rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
+               if (unlikely(rc != send))
+                       break;
 
-                       tsk->link_cong = 1;
+               rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
+               if (unlikely(rc == -ELINKCONG)) {
+                       tsk->cong_link_cnt = 1;
+                       rc = 0;
                }
-               rc = tipc_wait_for_sndpkt(sock, &timeo);
-       } while (!rc);
+               if (likely(!rc)) {
+                       tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
+                       sent += send;
+               }
+       } while (sent < dlen && !rc);
 
-       __skb_queue_purge(&pktchain);
        return sent ? sent : rc;
 }
 
@@ -1131,7 +1110,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
        if (dsz > TIPC_MAX_USER_MSG_SIZE)
                return -EMSGSIZE;
 
-       return tipc_send_stream(sock, m, dsz);
+       return tipc_sendstream(sock, m, dsz);
 }
 
 /* tipc_sk_finish_conn - complete the setup of a connection
@@ -1289,7 +1268,10 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
        struct sock *sk = sock->sk;
        DEFINE_WAIT(wait);
        long timeo = *timeop;
-       int err;
+       int err = sock_error(sk);
+
+       if (err)
+               return err;
 
        for (;;) {
                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
@@ -1311,6 +1293,10 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
                err = sock_intr_errno(timeo);
                if (signal_pending(current))
                        break;
+
+               err = sock_error(sk);
+               if (err)
+                       break;
        }
        finish_wait(sk_sleep(sk), &wait);
        *timeop = timeo;
@@ -1320,7 +1306,7 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
 /**
  * tipc_recvmsg - receive packet-oriented message
  * @m: descriptor for message info
- * @buf_len: total size of user buffer area
+ * @buflen: length of user buffer area
  * @flags: receive flags
  *
  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
@@ -1328,95 +1314,85 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
  *
  * Returns size of returned message data, errno otherwise
  */
-static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
-                       int flags)
+static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
+                       size_t buflen,  int flags)
 {
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
-       struct sk_buff *buf;
-       struct tipc_msg *msg;
-       bool is_connectionless = tipc_sk_type_connectionless(sk);
-       long timeo;
-       unsigned int sz;
-       u32 err;
-       int res, hlen;
+       struct sk_buff *skb;
+       struct tipc_msg *hdr;
+       bool connected = !tipc_sk_type_connectionless(sk);
+       int rc, err, hlen, dlen, copy;
+       long timeout;
 
        /* Catch invalid receive requests */
-       if (unlikely(!buf_len))
+       if (unlikely(!buflen))
                return -EINVAL;
 
        lock_sock(sk);
-
-       if (!is_connectionless && unlikely(sk->sk_state == TIPC_OPEN)) {
-               res = -ENOTCONN;
+       if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
+               rc = -ENOTCONN;
                goto exit;
        }
+       timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 
-       timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
-restart:
-
-       /* Look for a message in receive queue; wait if necessary */
-       res = tipc_wait_for_rcvmsg(sock, &timeo);
-       if (res)
-               goto exit;
-
-       /* Look at first message in receive queue */
-       buf = skb_peek(&sk->sk_receive_queue);
-       msg = buf_msg(buf);
-       sz = msg_data_sz(msg);
-       hlen = msg_hdr_sz(msg);
-       err = msg_errcode(msg);
-
-       /* Discard an empty non-errored message & try again */
-       if ((!sz) && (!err)) {
+       do {
+               /* Look at first msg in receive queue; wait if necessary */
+               rc = tipc_wait_for_rcvmsg(sock, &timeout);
+               if (unlikely(rc))
+                       goto exit;
+               skb = skb_peek(&sk->sk_receive_queue);
+               hdr = buf_msg(skb);
+               dlen = msg_data_sz(hdr);
+               hlen = msg_hdr_sz(hdr);
+               err = msg_errcode(hdr);
+               if (likely(dlen || err))
+                       break;
                tsk_advance_rx_queue(sk);
-               goto restart;
-       }
-
-       /* Capture sender's address (optional) */
-       set_orig_addr(m, msg);
+       } while (1);
 
-       /* Capture ancillary data (optional) */
-       res = tipc_sk_anc_data_recv(m, msg, tsk);
-       if (res)
+       /* Collect msg meta data, including error code and rejected data */
+       set_orig_addr(m, hdr);
+       rc = tipc_sk_anc_data_recv(m, hdr, tsk);
+       if (unlikely(rc))
                goto exit;
 
-       /* Capture message data (if valid) & compute return value (always) */
-       if (!err) {
-               if (unlikely(buf_len < sz)) {
-                       sz = buf_len;
+       /* Capture data if non-error msg, otherwise just set return value */
+       if (likely(!err)) {
+               copy = min_t(int, dlen, buflen);
+               if (unlikely(copy != dlen))
                        m->msg_flags |= MSG_TRUNC;
-               }
-               res = skb_copy_datagram_msg(buf, hlen, m, sz);
-               if (res)
-                       goto exit;
-               res = sz;
+               rc = skb_copy_datagram_msg(skb, hlen, m, copy);
        } else {
-               if (is_connectionless || err == TIPC_CONN_SHUTDOWN ||
-                   m->msg_control)
-                       res = 0;
-               else
-                       res = -ECONNRESET;
+               copy = 0;
+               rc = 0;
+               if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control)
+                       rc = -ECONNRESET;
        }
+       if (unlikely(rc))
+               goto exit;
 
+       /* Caption of data or error code/rejected data was successful */
        if (unlikely(flags & MSG_PEEK))
                goto exit;
 
-       if (likely(!is_connectionless)) {
-               tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
-               if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
-                       tipc_sk_send_ack(tsk);
-       }
        tsk_advance_rx_queue(sk);
+       if (likely(!connected))
+               goto exit;
+
+       /* Send connection flow control ack when applicable */
+       tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
+       if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
+               tipc_sk_send_ack(tsk);
 exit:
        release_sock(sk);
-       return res;
+       return rc ? rc : copy;
 }
 
 /**
- * tipc_recv_stream - receive stream-oriented data
+ * tipc_recvstream - receive stream-oriented data
  * @m: descriptor for message info
- * @buf_len: total size of user buffer area
+ * @buflen: total size of user buffer area
  * @flags: receive flags
  *
  * Used for SOCK_STREAM messages only.  If not enough data is available
@@ -1424,111 +1400,98 @@ exit:
  *
  * Returns size of returned message data, errno otherwise
  */
-static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
-                           size_t buf_len, int flags)
+static int tipc_recvstream(struct socket *sock, struct msghdr *m,
+                          size_t buflen, int flags)
 {
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
-       struct sk_buff *buf;
-       struct tipc_msg *msg;
-       long timeo;
-       unsigned int sz;
-       int target;
-       int sz_copied = 0;
-       u32 err;
-       int res = 0, hlen;
+       struct sk_buff *skb;
+       struct tipc_msg *hdr;
+       struct tipc_skb_cb *skb_cb;
+       bool peek = flags & MSG_PEEK;
+       int offset, required, copy, copied = 0;
+       int hlen, dlen, err, rc;
+       long timeout;
 
        /* Catch invalid receive attempts */
-       if (unlikely(!buf_len))
+       if (unlikely(!buflen))
                return -EINVAL;
 
        lock_sock(sk);
 
        if (unlikely(sk->sk_state == TIPC_OPEN)) {
-               res = -ENOTCONN;
-               goto exit;
-       }
-
-       target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
-       timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
-
-restart:
-       /* Look for a message in receive queue; wait if necessary */
-       res = tipc_wait_for_rcvmsg(sock, &timeo);
-       if (res)
+               rc = -ENOTCONN;
                goto exit;
-
-       /* Look at first message in receive queue */
-       buf = skb_peek(&sk->sk_receive_queue);
-       msg = buf_msg(buf);
-       sz = msg_data_sz(msg);
-       hlen = msg_hdr_sz(msg);
-       err = msg_errcode(msg);
-
-       /* Discard an empty non-errored message & try again */
-       if ((!sz) && (!err)) {
-               tsk_advance_rx_queue(sk);
-               goto restart;
-       }
-
-       /* Optionally capture sender's address & ancillary data of first msg */
-       if (sz_copied == 0) {
-               set_orig_addr(m, msg);
-               res = tipc_sk_anc_data_recv(m, msg, tsk);
-               if (res)
-                       goto exit;
        }
+       required = sock_rcvlowat(sk, flags & MSG_WAITALL, buflen);
+       timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 
-       /* Capture message data (if valid) & compute return value (always) */
-       if (!err) {
-               u32 offset = TIPC_SKB_CB(buf)->bytes_read;
-               u32 needed;
-               int sz_to_copy;
-
-               sz -= offset;
-               needed = (buf_len - sz_copied);
-               sz_to_copy = min(sz, needed);
-
-               res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
-               if (res)
-                       goto exit;
+       do {
+               /* Look at first msg in receive queue; wait if necessary */
+               rc = tipc_wait_for_rcvmsg(sock, &timeout);
+               if (unlikely(rc))
+                       break;
+               skb = skb_peek(&sk->sk_receive_queue);
+               skb_cb = TIPC_SKB_CB(skb);
+               hdr = buf_msg(skb);
+               dlen = msg_data_sz(hdr);
+               hlen = msg_hdr_sz(hdr);
+               err = msg_errcode(hdr);
+
+               /* Discard any empty non-errored (SYN-) message */
+               if (unlikely(!dlen && !err)) {
+                       tsk_advance_rx_queue(sk);
+                       continue;
+               }
 
-               sz_copied += sz_to_copy;
+               /* Collect msg meta data, incl. error code and rejected data */
+               if (!copied) {
+                       set_orig_addr(m, hdr);
+                       rc = tipc_sk_anc_data_recv(m, hdr, tsk);
+                       if (rc)
+                               break;
+               }
 
-               if (sz_to_copy < sz) {
-                       if (!(flags & MSG_PEEK))
-                               TIPC_SKB_CB(buf)->bytes_read =
-                                       offset + sz_to_copy;
-                       goto exit;
+               /* Copy data if msg ok, otherwise return error/partial data */
+               if (likely(!err)) {
+                       offset = skb_cb->bytes_read;
+                       copy = min_t(int, dlen - offset, buflen - copied);
+                       rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
+                       if (unlikely(rc))
+                               break;
+                       copied += copy;
+                       offset += copy;
+                       if (unlikely(offset < dlen)) {
+                               if (!peek)
+                                       skb_cb->bytes_read = offset;
+                               break;
+                       }
+               } else {
+                       rc = 0;
+                       if ((err != TIPC_CONN_SHUTDOWN) && !m->msg_control)
+                               rc = -ECONNRESET;
+                       if (copied || rc)
+                               break;
                }
-       } else {
-               if (sz_copied != 0)
-                       goto exit; /* can't add error msg to valid data */
 
-               if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
-                       res = 0;
-               else
-                       res = -ECONNRESET;
-       }
+               if (unlikely(peek))
+                       break;
 
-       if (unlikely(flags & MSG_PEEK))
-               goto exit;
+               tsk_advance_rx_queue(sk);
 
-       tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
-       if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
-               tipc_sk_send_ack(tsk);
-       tsk_advance_rx_queue(sk);
+               /* Send connection flow control advertisement when applicable */
+               tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
+               if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
+                       tipc_sk_send_ack(tsk);
 
-       /* Loop around if more data is required */
-       if ((sz_copied < buf_len) &&    /* didn't get all requested data */
-           (!skb_queue_empty(&sk->sk_receive_queue) ||
-           (sz_copied < target)) &&    /* and more is ready or required */
-           (!err))                     /* and haven't reached a FIN */
-               goto restart;
+               /* Exit if all requested data or FIN/error received */
+               if (copied == buflen || err)
+                       break;
 
+       } while (!skb_queue_empty(&sk->sk_receive_queue) || copied < required);
 exit:
        release_sock(sk);
-       return sz_copied ? sz_copied : res;
+       return copied ? copied : rc;
 }
 
 /**
@@ -1581,6 +1544,8 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
        struct sock *sk = &tsk->sk;
        struct net *net = sock_net(sk);
        struct tipc_msg *hdr = buf_msg(skb);
+       u32 pport = msg_origport(hdr);
+       u32 pnode = msg_orignode(hdr);
 
        if (unlikely(msg_mcast(hdr)))
                return false;
@@ -1588,18 +1553,28 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
        switch (sk->sk_state) {
        case TIPC_CONNECTING:
                /* Accept only ACK or NACK message */
-               if (unlikely(!msg_connected(hdr)))
-                       return false;
+               if (unlikely(!msg_connected(hdr))) {
+                       if (pport != tsk_peer_port(tsk) ||
+                           pnode != tsk_peer_node(tsk))
+                               return false;
+
+                       tipc_set_sk_state(sk, TIPC_DISCONNECTING);
+                       sk->sk_err = ECONNREFUSED;
+                       sk->sk_state_change(sk);
+                       return true;
+               }
 
                if (unlikely(msg_errcode(hdr))) {
                        tipc_set_sk_state(sk, TIPC_DISCONNECTING);
                        sk->sk_err = ECONNREFUSED;
+                       sk->sk_state_change(sk);
                        return true;
                }
 
                if (unlikely(!msg_isdata(hdr))) {
                        tipc_set_sk_state(sk, TIPC_DISCONNECTING);
                        sk->sk_err = EINVAL;
+                       sk->sk_state_change(sk);
                        return true;
                }
 
@@ -1611,8 +1586,7 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
                        return true;
 
                /* If empty 'ACK-' message, wake up sleeping connect() */
-               if (waitqueue_active(sk_sleep(sk)))
-                       wake_up_interruptible(sk_sleep(sk));
+               sk->sk_data_ready(sk);
 
                /* 'ACK-' message is neither accepted nor rejected: */
                msg_set_dest_droppable(hdr, 1);
@@ -1698,6 +1672,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
        unsigned int limit = rcvbuf_limit(sk, skb);
        int err = TIPC_OK;
        int usr = msg_user(hdr);
+       u32 onode;
 
        if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
                tipc_sk_proto_rcv(tsk, skb, xmitq);
@@ -1705,8 +1680,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
        }
 
        if (unlikely(usr == SOCK_WAKEUP)) {
+               onode = msg_orignode(hdr);
                kfree_skb(skb);
-               tsk->link_cong = 0;
+               u32_del(&tsk->cong_links, onode);
+               tsk->cong_link_cnt--;
                sk->sk_write_space(sk);
                return false;
        }
@@ -2057,7 +2034,8 @@ static int tipc_wait_for_accept(struct socket *sock, long timeo)
  *
  * Returns 0 on success, errno otherwise
  */
-static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
+static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
+                      bool kern)
 {
        struct sock *new_sk, *sk = sock->sk;
        struct sk_buff *buf;
@@ -2079,7 +2057,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
 
        buf = skb_peek(&sk->sk_receive_queue);
 
-       res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 0);
+       res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern);
        if (res)
                goto exit;
        security_sk_clone(sock->sk, new_sock->sk);
@@ -2114,7 +2092,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
                struct msghdr m = {NULL,};
 
                tsk_advance_rx_queue(sk);
-               __tipc_send_stream(new_sock, &m, 0);
+               __tipc_sendstream(new_sock, &m, 0);
        } else {
                __skb_dequeue(&sk->sk_receive_queue);
                __skb_queue_head(&new_sk->sk_receive_queue, buf);
@@ -2269,24 +2247,27 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 void tipc_sk_reinit(struct net *net)
 {
        struct tipc_net *tn = net_generic(net, tipc_net_id);
-       const struct bucket_table *tbl;
-       struct rhash_head *pos;
+       struct rhashtable_iter iter;
        struct tipc_sock *tsk;
        struct tipc_msg *msg;
-       int i;
 
-       rcu_read_lock();
-       tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
-       for (i = 0; i < tbl->size; i++) {
-               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
+       rhashtable_walk_enter(&tn->sk_rht, &iter);
+
+       do {
+               tsk = ERR_PTR(rhashtable_walk_start(&iter));
+               if (tsk)
+                       continue;
+
+               while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
                        spin_lock_bh(&tsk->sk.sk_lock.slock);
                        msg = &tsk->phdr;
                        msg_set_prevnode(msg, tn->own_addr);
                        msg_set_orignode(msg, tn->own_addr);
                        spin_unlock_bh(&tsk->sk.sk_lock.slock);
                }
-       }
-       rcu_read_unlock();
+
+               rhashtable_walk_stop(&iter);
+       } while (tsk == ERR_PTR(-EAGAIN));
 }
 
 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
@@ -2382,18 +2363,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
 {
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
-       u32 value;
-       int res;
+       u32 value = 0;
+       int res = 0;
 
        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
                return 0;
        if (lvl != SOL_TIPC)
                return -ENOPROTOOPT;
-       if (ol < sizeof(value))
-               return -EINVAL;
-       res = get_user(value, (u32 __user *)ov);
-       if (res)
-               return res;
+
+       switch (opt) {
+       case TIPC_IMPORTANCE:
+       case TIPC_SRC_DROPPABLE:
+       case TIPC_DEST_DROPPABLE:
+       case TIPC_CONN_TIMEOUT:
+               if (ol < sizeof(value))
+                       return -EINVAL;
+               res = get_user(value, (u32 __user *)ov);
+               if (res)
+                       return res;
+               break;
+       default:
+               if (ov || ol)
+                       return -EINVAL;
+       }
 
        lock_sock(sk);
 
@@ -2412,7 +2404,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
                break;
        case TIPC_CONN_TIMEOUT:
                tipc_sk(sk)->conn_timeout = value;
-               /* no need to set "res", since already 0 at this point */
+               break;
+       case TIPC_MCAST_BROADCAST:
+               tsk->mc_method.rcast = false;
+               tsk->mc_method.mandatory = true;
+               break;
+       case TIPC_MCAST_REPLICAST:
+               tsk->mc_method.rcast = true;
+               tsk->mc_method.mandatory = true;
                break;
        default:
                res = -EINVAL;
@@ -2516,6 +2515,28 @@ static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
        }
 }
 
+static int tipc_socketpair(struct socket *sock1, struct socket *sock2)
+{
+       struct tipc_sock *tsk2 = tipc_sk(sock2->sk);
+       struct tipc_sock *tsk1 = tipc_sk(sock1->sk);
+       u32 onode = tipc_own_addr(sock_net(sock1->sk));
+
+       tsk1->peer.family = AF_TIPC;
+       tsk1->peer.addrtype = TIPC_ADDR_ID;
+       tsk1->peer.scope = TIPC_NODE_SCOPE;
+       tsk1->peer.addr.id.ref = tsk2->portid;
+       tsk1->peer.addr.id.node = onode;
+       tsk2->peer.family = AF_TIPC;
+       tsk2->peer.addrtype = TIPC_ADDR_ID;
+       tsk2->peer.scope = TIPC_NODE_SCOPE;
+       tsk2->peer.addr.id.ref = tsk1->portid;
+       tsk2->peer.addr.id.node = onode;
+
+       tipc_sk_finish_conn(tsk1, tsk2->portid, onode);
+       tipc_sk_finish_conn(tsk2, tsk1->portid, onode);
+       return 0;
+}
+
 /* Protocol switches for the various types of TIPC sockets */
 
 static const struct proto_ops msg_ops = {
@@ -2524,7 +2545,7 @@ static const struct proto_ops msg_ops = {
        .release        = tipc_release,
        .bind           = tipc_bind,
        .connect        = tipc_connect,
-       .socketpair     = sock_no_socketpair,
+       .socketpair     = tipc_socketpair,
        .accept         = sock_no_accept,
        .getname        = tipc_getname,
        .poll           = tipc_poll,
@@ -2545,7 +2566,7 @@ static const struct proto_ops packet_ops = {
        .release        = tipc_release,
        .bind           = tipc_bind,
        .connect        = tipc_connect,
-       .socketpair     = sock_no_socketpair,
+       .socketpair     = tipc_socketpair,
        .accept         = tipc_accept,
        .getname        = tipc_getname,
        .poll           = tipc_poll,
@@ -2566,7 +2587,7 @@ static const struct proto_ops stream_ops = {
        .release        = tipc_release,
        .bind           = tipc_bind,
        .connect        = tipc_connect,
-       .socketpair     = sock_no_socketpair,
+       .socketpair     = tipc_socketpair,
        .accept         = tipc_accept,
        .getname        = tipc_getname,
        .poll           = tipc_poll,
@@ -2575,8 +2596,8 @@ static const struct proto_ops stream_ops = {
        .shutdown       = tipc_shutdown,
        .setsockopt     = tipc_setsockopt,
        .getsockopt     = tipc_getsockopt,
-       .sendmsg        = tipc_send_stream,
-       .recvmsg        = tipc_recv_stream,
+       .sendmsg        = tipc_sendstream,
+       .recvmsg        = tipc_recvstream,
        .mmap           = sock_no_mmap,
        .sendpage       = sock_no_sendpage
 };
@@ -2849,7 +2870,7 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
 
                err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
                                       attrs[TIPC_NLA_SOCK],
-                                      tipc_nl_sock_policy);
+                                      tipc_nl_sock_policy, NULL);
                if (err)
                        return err;