OSDN Git Service

net/smc: switch connections to alternate link
authorKarsten Graul <kgraul@linux.ibm.com>
Mon, 4 May 2020 12:18:38 +0000 (14:18 +0200)
committerDavid S. Miller <davem@davemloft.net>
Mon, 4 May 2020 17:54:39 +0000 (10:54 -0700)
Add smc_switch_conns() to switch all connections from a link that is
going down. Find an other link to switch the connections to, and
switch each connection to the new link. smc_switch_cursor() updates the
cursors of a connection to the state of the last successfully sent CDC
message. When there is no link to switch to, terminate the link group.
Call smc_switch_conns() when a link is going down.
And with the possibility that links of connections can switch adapt CDC
and TX functions to detect and handle link switches.

Signed-off-by: Karsten Graul <kgraul@linux.ibm.com>
Reviewed-by: Ursula Braun <ubraun@linux.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/smc/smc_cdc.c
net/smc/smc_cdc.h
net/smc/smc_core.c
net/smc/smc_core.h
net/smc/smc_llc.c
net/smc/smc_tx.c

index c5e3329..3ca9860 100644 (file)
@@ -56,11 +56,11 @@ static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
 }
 
 int smc_cdc_get_free_slot(struct smc_connection *conn,
+                         struct smc_link *link,
                          struct smc_wr_buf **wr_buf,
                          struct smc_rdma_wr **wr_rdma_buf,
                          struct smc_cdc_tx_pend **pend)
 {
-       struct smc_link *link = conn->lnk;
        int rc;
 
        rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf,
@@ -119,13 +119,27 @@ static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn)
 {
        struct smc_cdc_tx_pend *pend;
        struct smc_wr_buf *wr_buf;
+       struct smc_link *link;
+       bool again = false;
        int rc;
 
-       rc = smc_cdc_get_free_slot(conn, &wr_buf, NULL, &pend);
+again:
+       link = conn->lnk;
+       rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
        if (rc)
                return rc;
 
        spin_lock_bh(&conn->send_lock);
+       if (link != conn->lnk) {
+               /* link of connection changed, try again one time*/
+               spin_unlock_bh(&conn->send_lock);
+               smc_wr_tx_put_slot(link,
+                                  (struct smc_wr_tx_pend_priv *)pend);
+               if (again)
+                       return -ENOLINK;
+               again = true;
+               goto again;
+       }
        rc = smc_cdc_msg_send(conn, wr_buf, pend);
        spin_unlock_bh(&conn->send_lock);
        return rc;
index 861dc24..42246b4 100644 (file)
@@ -304,6 +304,7 @@ struct smc_cdc_tx_pend {
 };
 
 int smc_cdc_get_free_slot(struct smc_connection *conn,
+                         struct smc_link *link,
                          struct smc_wr_buf **wr_buf,
                          struct smc_rdma_wr **wr_rdma_buf,
                          struct smc_cdc_tx_pend **pend);
index 32a6cad..21bc1ec 100644 (file)
@@ -432,6 +432,135 @@ out:
        return rc;
 }
 
+static int smc_write_space(struct smc_connection *conn)
+{
+       int buffer_len = conn->peer_rmbe_size;
+       union smc_host_cursor prod;
+       union smc_host_cursor cons;
+       int space;
+
+       smc_curs_copy(&prod, &conn->local_tx_ctrl.prod, conn);
+       smc_curs_copy(&cons, &conn->local_rx_ctrl.cons, conn);
+       /* determine rx_buf space */
+       space = buffer_len - smc_curs_diff(buffer_len, &cons, &prod);
+       return space;
+}
+
+static int smc_switch_cursor(struct smc_sock *smc)
+{
+       struct smc_connection *conn = &smc->conn;
+       union smc_host_cursor cons, fin;
+       int rc = 0;
+       int diff;
+
+       smc_curs_copy(&conn->tx_curs_sent, &conn->tx_curs_fin, conn);
+       smc_curs_copy(&fin, &conn->local_tx_ctrl_fin, conn);
+       /* set prod cursor to old state, enforce tx_rdma_writes() */
+       smc_curs_copy(&conn->local_tx_ctrl.prod, &fin, conn);
+       smc_curs_copy(&cons, &conn->local_rx_ctrl.cons, conn);
+
+       if (smc_curs_comp(conn->peer_rmbe_size, &cons, &fin) < 0) {
+               /* cons cursor advanced more than fin, and prod was set
+                * fin above, so now prod is smaller than cons. Fix that.
+                */
+               diff = smc_curs_diff(conn->peer_rmbe_size, &fin, &cons);
+               smc_curs_add(conn->sndbuf_desc->len,
+                            &conn->tx_curs_sent, diff);
+               smc_curs_add(conn->sndbuf_desc->len,
+                            &conn->tx_curs_fin, diff);
+
+               smp_mb__before_atomic();
+               atomic_add(diff, &conn->sndbuf_space);
+               smp_mb__after_atomic();
+
+               smc_curs_add(conn->peer_rmbe_size,
+                            &conn->local_tx_ctrl.prod, diff);
+               smc_curs_add(conn->peer_rmbe_size,
+                            &conn->local_tx_ctrl_fin, diff);
+       }
+       /* recalculate, value is used by tx_rdma_writes() */
+       atomic_set(&smc->conn.peer_rmbe_space, smc_write_space(conn));
+
+       if (smc->sk.sk_state != SMC_INIT &&
+           smc->sk.sk_state != SMC_CLOSED) {
+               /* tbd: call rc = smc_cdc_get_slot_and_msg_send(conn); */
+               if (!rc) {
+                       schedule_delayed_work(&conn->tx_work, 0);
+                       smc->sk.sk_data_ready(&smc->sk);
+               }
+       }
+       return rc;
+}
+
+struct smc_link *smc_switch_conns(struct smc_link_group *lgr,
+                                 struct smc_link *from_lnk, bool is_dev_err)
+{
+       struct smc_link *to_lnk = NULL;
+       struct smc_connection *conn;
+       struct smc_sock *smc;
+       struct rb_node *node;
+       int i, rc = 0;
+
+       /* link is inactive, wake up tx waiters */
+       smc_wr_wakeup_tx_wait(from_lnk);
+
+       for (i = 0; i < SMC_LINKS_PER_LGR_MAX; i++) {
+               if (lgr->lnk[i].state != SMC_LNK_ACTIVE ||
+                   i == from_lnk->link_idx)
+                       continue;
+               if (is_dev_err && from_lnk->smcibdev == lgr->lnk[i].smcibdev &&
+                   from_lnk->ibport == lgr->lnk[i].ibport) {
+                       continue;
+               }
+               to_lnk = &lgr->lnk[i];
+               break;
+       }
+       if (!to_lnk) {
+               smc_lgr_terminate_sched(lgr);
+               return NULL;
+       }
+again:
+       read_lock_bh(&lgr->conns_lock);
+       for (node = rb_first(&lgr->conns_all); node; node = rb_next(node)) {
+               conn = rb_entry(node, struct smc_connection, alert_node);
+               if (conn->lnk != from_lnk)
+                       continue;
+               smc = container_of(conn, struct smc_sock, conn);
+               /* conn->lnk not yet set in SMC_INIT state */
+               if (smc->sk.sk_state == SMC_INIT)
+                       continue;
+               if (smc->sk.sk_state == SMC_CLOSED ||
+                   smc->sk.sk_state == SMC_PEERCLOSEWAIT1 ||
+                   smc->sk.sk_state == SMC_PEERCLOSEWAIT2 ||
+                   smc->sk.sk_state == SMC_APPFINCLOSEWAIT ||
+                   smc->sk.sk_state == SMC_APPCLOSEWAIT1 ||
+                   smc->sk.sk_state == SMC_APPCLOSEWAIT2 ||
+                   smc->sk.sk_state == SMC_PEERFINCLOSEWAIT ||
+                   smc->sk.sk_state == SMC_PEERABORTWAIT ||
+                   smc->sk.sk_state == SMC_PROCESSABORT) {
+                       spin_lock_bh(&conn->send_lock);
+                       conn->lnk = to_lnk;
+                       spin_unlock_bh(&conn->send_lock);
+                       continue;
+               }
+               sock_hold(&smc->sk);
+               read_unlock_bh(&lgr->conns_lock);
+               /* avoid race with smcr_tx_sndbuf_nonempty() */
+               spin_lock_bh(&conn->send_lock);
+               conn->lnk = to_lnk;
+               rc = smc_switch_cursor(smc);
+               spin_unlock_bh(&conn->send_lock);
+               sock_put(&smc->sk);
+               if (rc) {
+                       smcr_link_down_cond_sched(to_lnk);
+                       return NULL;
+               }
+               goto again;
+       }
+       read_unlock_bh(&lgr->conns_lock);
+       return to_lnk;
+}
+
 static void smcr_buf_unuse(struct smc_buf_desc *rmb_desc,
                           struct smc_link_group *lgr)
 {
@@ -943,8 +1072,7 @@ static void smcr_link_down(struct smc_link *lnk)
                return;
 
        smc_ib_modify_qp_reset(lnk);
-       to_lnk = NULL;
-       /* tbd: call to_lnk = smc_switch_conns(lgr, lnk, true); */
+       to_lnk = smc_switch_conns(lgr, lnk, true);
        if (!to_lnk) { /* no backup link available */
                smcr_link_clear(lnk);
                return;
index 7fe53fe..584f112 100644 (file)
@@ -380,6 +380,8 @@ void smcr_link_clear(struct smc_link *lnk);
 int smcr_buf_map_lgr(struct smc_link *lnk);
 int smcr_buf_reg_lgr(struct smc_link *lnk);
 int smcr_link_reg_rmb(struct smc_link *link, struct smc_buf_desc *rmb_desc);
+struct smc_link *smc_switch_conns(struct smc_link_group *lgr,
+                                 struct smc_link *from_lnk, bool is_dev_err);
 void smcr_link_down_cond(struct smc_link *lnk);
 void smcr_link_down_cond_sched(struct smc_link *lnk);
 
index 7675ccd..8d2368a 100644 (file)
@@ -933,7 +933,7 @@ static void smc_llc_delete_asym_link(struct smc_link_group *lgr)
                return; /* no asymmetric link */
        if (!smc_link_downing(&lnk_asym->state))
                return;
-       /* tbd: lnk_new = smc_switch_conns(lgr, lnk_asym, false); */
+       lnk_new = smc_switch_conns(lgr, lnk_asym, false);
        smc_wr_tx_wait_no_pending_sends(lnk_asym);
        if (!lnk_new)
                goto out_free;
@@ -1195,7 +1195,7 @@ static void smc_llc_process_cli_delete_link(struct smc_link_group *lgr)
        smc_llc_send_message(lnk, &qentry->msg); /* response */
 
        if (smc_link_downing(&lnk_del->state)) {
-               /* tbd: call smc_switch_conns(lgr, lnk_del, false); */
+               smc_switch_conns(lgr, lnk_del, false);
                smc_wr_tx_wait_no_pending_sends(lnk_del);
        }
        smcr_link_clear(lnk_del);
@@ -1245,7 +1245,7 @@ static void smc_llc_process_srv_delete_link(struct smc_link_group *lgr)
                goto out; /* asymmetric link already deleted */
 
        if (smc_link_downing(&lnk_del->state)) {
-               /* tbd: call smc_switch_conns(lgr, lnk_del, false); */
+               smc_switch_conns(lgr, lnk_del, false);
                smc_wr_tx_wait_no_pending_sends(lnk_del);
        }
        if (!list_empty(&lgr->list)) {
index 4172045..54ba044 100644 (file)
@@ -482,12 +482,13 @@ static int smc_tx_rdma_writes(struct smc_connection *conn,
 static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
 {
        struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
+       struct smc_link *link = conn->lnk;
        struct smc_rdma_wr *wr_rdma_buf;
        struct smc_cdc_tx_pend *pend;
        struct smc_wr_buf *wr_buf;
        int rc;
 
-       rc = smc_cdc_get_free_slot(conn, &wr_buf, &wr_rdma_buf, &pend);
+       rc = smc_cdc_get_free_slot(conn, link, &wr_buf, &wr_rdma_buf, &pend);
        if (rc < 0) {
                if (rc == -EBUSY) {
                        struct smc_sock *smc =
@@ -505,10 +506,17 @@ static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
        }
 
        spin_lock_bh(&conn->send_lock);
+       if (link != conn->lnk) {
+               /* link of connection changed, tx_work will restart */
+               smc_wr_tx_put_slot(link,
+                                  (struct smc_wr_tx_pend_priv *)pend);
+               rc = -ENOLINK;
+               goto out_unlock;
+       }
        if (!pflags->urg_data_present) {
                rc = smc_tx_rdma_writes(conn, wr_rdma_buf);
                if (rc) {
-                       smc_wr_tx_put_slot(conn->lnk,
+                       smc_wr_tx_put_slot(link,
                                           (struct smc_wr_tx_pend_priv *)pend);
                        goto out_unlock;
                }