@@ -150,9 +150,11 @@ static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn)
again:
link = conn->lnk;
+ if (!smc_wr_tx_link_hold(link))
+ return -ENOLINK;
rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
if (rc)
- return rc;
+ goto put_out;
spin_lock_bh(&conn->send_lock);
if (link != conn->lnk) {
@@ -160,6 +162,7 @@ again:
spin_unlock_bh(&conn->send_lock);
smc_wr_tx_put_slot(link,
(struct smc_wr_tx_pend_priv *)pend);
+ smc_wr_tx_link_put(link);
if (again)
return -ENOLINK;
again = true;
@@ -167,6 +170,8 @@ again:
}
rc = smc_cdc_msg_send(conn, wr_buf, pend);
spin_unlock_bh(&conn->send_lock);
+put_out:
+ smc_wr_tx_link_put(link);
return rc;
}
@@ -550,7 +550,7 @@ struct smc_link *smc_switch_conns(struct smc_link_group *lgr,
to_lnk = &lgr->lnk[i];
break;
}
- if (!to_lnk) {
+ if (!to_lnk || !smc_wr_tx_link_hold(to_lnk)) {
smc_lgr_terminate_sched(lgr);
return NULL;
}
@@ -582,24 +582,26 @@ again:
read_unlock_bh(&lgr->conns_lock);
/* pre-fetch buffer outside of send_lock, might sleep */
rc = smc_cdc_get_free_slot(conn, to_lnk, &wr_buf, NULL, &pend);
- if (rc) {
- smcr_link_down_cond_sched(to_lnk);
- return NULL;
- }
+ if (rc)
+ goto err_out;
/* avoid race with smcr_tx_sndbuf_nonempty() */
spin_lock_bh(&conn->send_lock);
conn->lnk = to_lnk;
rc = smc_switch_cursor(smc, pend, wr_buf);
spin_unlock_bh(&conn->send_lock);
sock_put(&smc->sk);
- if (rc) {
- smcr_link_down_cond_sched(to_lnk);
- return NULL;
- }
+ if (rc)
+ goto err_out;
goto again;
}
read_unlock_bh(&lgr->conns_lock);
+ smc_wr_tx_link_put(to_lnk);
return to_lnk;
+
+err_out:
+ smcr_link_down_cond_sched(to_lnk);
+ smc_wr_tx_link_put(to_lnk);
+ return NULL;
}
static void smcr_buf_unuse(struct smc_buf_desc *rmb_desc,
@@ -383,9 +383,11 @@ int smc_llc_send_confirm_link(struct smc_link *link,
struct smc_wr_buf *wr_buf;
int rc;
+ if (!smc_wr_tx_link_hold(link))
+ return -ENOLINK;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc)
- return rc;
+ goto put_out;
confllc = (struct smc_llc_msg_confirm_link *)wr_buf;
memset(confllc, 0, sizeof(*confllc));
confllc->hd.common.type = SMC_LLC_CONFIRM_LINK;
@@ -402,6 +404,8 @@ int smc_llc_send_confirm_link(struct smc_link *link,
confllc->max_links = SMC_LLC_ADD_LNK_MAX_LINKS;
/* send llc message */
rc = smc_wr_tx_send(link, pend);
+put_out:
+ smc_wr_tx_link_put(link);
return rc;
}
@@ -415,9 +419,11 @@ static int smc_llc_send_confirm_rkey(struct smc_link *send_link,
struct smc_link *link;
int i, rc, rtok_ix;
+ if (!smc_wr_tx_link_hold(send_link))
+ return -ENOLINK;
rc = smc_llc_add_pending_send(send_link, &wr_buf, &pend);
if (rc)
- return rc;
+ goto put_out;
rkeyllc = (struct smc_llc_msg_confirm_rkey *)wr_buf;
memset(rkeyllc, 0, sizeof(*rkeyllc));
rkeyllc->hd.common.type = SMC_LLC_CONFIRM_RKEY;
@@ -444,6 +450,8 @@ static int smc_llc_send_confirm_rkey(struct smc_link *send_link,
(u64)sg_dma_address(rmb_desc->sgt[send_link->link_idx].sgl));
/* send llc message */
rc = smc_wr_tx_send(send_link, pend);
+put_out:
+ smc_wr_tx_link_put(send_link);
return rc;
}
@@ -456,9 +464,11 @@ static int smc_llc_send_delete_rkey(struct smc_link *link,
struct smc_wr_buf *wr_buf;
int rc;
+ if (!smc_wr_tx_link_hold(link))
+ return -ENOLINK;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc)
- return rc;
+ goto put_out;
rkeyllc = (struct smc_llc_msg_delete_rkey *)wr_buf;
memset(rkeyllc, 0, sizeof(*rkeyllc));
rkeyllc->hd.common.type = SMC_LLC_DELETE_RKEY;
@@ -467,6 +477,8 @@ static int smc_llc_send_delete_rkey(struct smc_link *link,
rkeyllc->rkey[0] = htonl(rmb_desc->mr_rx[link->link_idx]->rkey);
/* send llc message */
rc = smc_wr_tx_send(link, pend);
+put_out:
+ smc_wr_tx_link_put(link);
return rc;
}
@@ -480,9 +492,11 @@ int smc_llc_send_add_link(struct smc_link *link, u8 mac[], u8 gid[],
struct smc_wr_buf *wr_buf;
int rc;
+ if (!smc_wr_tx_link_hold(link))
+ return -ENOLINK;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc)
- return rc;
+ goto put_out;
addllc = (struct smc_llc_msg_add_link *)wr_buf;
memset(addllc, 0, sizeof(*addllc));
@@ -504,6 +518,8 @@ int smc_llc_send_add_link(struct smc_link *link, u8 mac[], u8 gid[],
}
/* send llc message */
rc = smc_wr_tx_send(link, pend);
+put_out:
+ smc_wr_tx_link_put(link);
return rc;
}
@@ -517,9 +533,11 @@ int smc_llc_send_delete_link(struct smc_link *link, u8 link_del_id,
struct smc_wr_buf *wr_buf;
int rc;
+ if (!smc_wr_tx_link_hold(link))
+ return -ENOLINK;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc)
- return rc;
+ goto put_out;
delllc = (struct smc_llc_msg_del_link *)wr_buf;
memset(delllc, 0, sizeof(*delllc));
@@ -536,6 +554,8 @@ int smc_llc_send_delete_link(struct smc_link *link, u8 link_del_id,
delllc->reason = htonl(reason);
/* send llc message */
rc = smc_wr_tx_send(link, pend);
+put_out:
+ smc_wr_tx_link_put(link);
return rc;
}
@@ -547,9 +567,11 @@ static int smc_llc_send_test_link(struct smc_link *link, u8 user_data[16])
struct smc_wr_buf *wr_buf;
int rc;
+ if (!smc_wr_tx_link_hold(link))
+ return -ENOLINK;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc)
- return rc;
+ goto put_out;
testllc = (struct smc_llc_msg_test_link *)wr_buf;
memset(testllc, 0, sizeof(*testllc));
testllc->hd.common.type = SMC_LLC_TEST_LINK;
@@ -557,6 +579,8 @@ static int smc_llc_send_test_link(struct smc_link *link, u8 user_data[16])
memcpy(testllc->user_data, user_data, sizeof(testllc->user_data));
/* send llc message */
rc = smc_wr_tx_send(link, pend);
+put_out:
+ smc_wr_tx_link_put(link);
return rc;
}
@@ -567,13 +591,16 @@ static int smc_llc_send_message(struct smc_link *link, void *llcbuf)
struct smc_wr_buf *wr_buf;
int rc;
- if (!smc_link_usable(link))
+ if (!smc_wr_tx_link_hold(link))
return -ENOLINK;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc)
- return rc;
+ goto put_out;
memcpy(wr_buf, llcbuf, sizeof(union smc_llc_msg));
- return smc_wr_tx_send(link, pend);
+ rc = smc_wr_tx_send(link, pend);
+put_out:
+ smc_wr_tx_link_put(link);
+ return rc;
}
/* schedule an llc send on link, may wait for buffers,
@@ -586,13 +613,16 @@ static int smc_llc_send_message_wait(struct smc_link *link, void *llcbuf)
struct smc_wr_buf *wr_buf;
int rc;
- if (!smc_link_usable(link))
+ if (!smc_wr_tx_link_hold(link))
return -ENOLINK;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc)
- return rc;
+ goto put_out;
memcpy(wr_buf, llcbuf, sizeof(union smc_llc_msg));
- return smc_wr_tx_send_wait(link, pend, SMC_LLC_WAIT_TIME);
+ rc = smc_wr_tx_send_wait(link, pend, SMC_LLC_WAIT_TIME);
+put_out:
+ smc_wr_tx_link_put(link);
+ return rc;
}
/********************************* receive ***********************************/
@@ -672,9 +702,11 @@ static int smc_llc_add_link_cont(struct smc_link *link,
struct smc_buf_desc *rmb;
u8 n;
+ if (!smc_wr_tx_link_hold(link))
+ return -ENOLINK;
rc = smc_llc_add_pending_send(link, &wr_buf, &pend);
if (rc)
- return rc;
+ goto put_out;
addc_llc = (struct smc_llc_msg_add_link_cont *)wr_buf;
memset(addc_llc, 0, sizeof(*addc_llc));
@@ -706,7 +738,10 @@ static int smc_llc_add_link_cont(struct smc_link *link,
addc_llc->hd.length = sizeof(struct smc_llc_msg_add_link_cont);
if (lgr->role == SMC_CLNT)
addc_llc->hd.flags |= SMC_LLC_FLAG_RESP;
- return smc_wr_tx_send(link, pend);
+ rc = smc_wr_tx_send(link, pend);
+put_out:
+ smc_wr_tx_link_put(link);
+ return rc;
}
static int smc_llc_cli_rkey_exchange(struct smc_link *link,
@@ -479,7 +479,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn,
/* Wakeup sndbuf consumers from any context (IRQ or process)
* since there is more data to transmit; usable snd_wnd as max transmit
*/
-static int _smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
+static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
{
struct smc_cdc_producer_flags *pflags = &conn->local_tx_ctrl.prod_flags;
struct smc_link *link = conn->lnk;
@@ -488,8 +488,11 @@ static int _smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
struct smc_wr_buf *wr_buf;
int rc;
+ if (!link || !smc_wr_tx_link_hold(link))
+ return -ENOLINK;
rc = smc_cdc_get_free_slot(conn, link, &wr_buf, &wr_rdma_buf, &pend);
if (rc < 0) {
+ smc_wr_tx_link_put(link);
if (rc == -EBUSY) {
struct smc_sock *smc =
container_of(conn, struct smc_sock, conn);
@@ -530,22 +533,7 @@ static int _smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
out_unlock:
spin_unlock_bh(&conn->send_lock);
- return rc;
-}
-
-static int smcr_tx_sndbuf_nonempty(struct smc_connection *conn)
-{
- struct smc_link *link = conn->lnk;
- int rc = -ENOLINK;
-
- if (!link)
- return rc;
-
- atomic_inc(&link->wr_tx_refcnt);
- if (smc_link_usable(link))
- rc = _smcr_tx_sndbuf_nonempty(conn);
- if (atomic_dec_and_test(&link->wr_tx_refcnt))
- wake_up_all(&link->wr_tx_wait);
+ smc_wr_tx_link_put(link);
return rc;
}
@@ -60,6 +60,20 @@ static inline void smc_wr_tx_set_wr_id(atomic_long_t *wr_tx_id, long val)
atomic_long_set(wr_tx_id, val);
}
+static inline bool smc_wr_tx_link_hold(struct smc_link *link)
+{
+ if (!smc_link_usable(link))
+ return false;
+ atomic_inc(&link->wr_tx_refcnt);
+ return true;
+}
+
+static inline void smc_wr_tx_link_put(struct smc_link *link)
+{
+ if (atomic_dec_and_test(&link->wr_tx_refcnt))
+ wake_up_all(&link->wr_tx_wait);
+}
+
static inline void smc_wr_wakeup_tx_wait(struct smc_link *lnk)
{
wake_up_all(&lnk->wr_tx_wait);