@@ -126,8 +126,6 @@ int sk_msg_zerocopy_from_iter(struct sock *sk, struct iov_iter *from,
struct sk_msg *msg, u32 bytes);
int sk_msg_memcopy_from_iter(struct sock *sk, struct iov_iter *from,
struct sk_msg *msg, u32 bytes);
-int sk_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags,
- long timeo, int *err);
int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
int len, int flags);
@@ -399,29 +399,6 @@ int sk_msg_memcopy_from_iter(struct sock *sk, struct iov_iter *from,
}
EXPORT_SYMBOL_GPL(sk_msg_memcopy_from_iter);
-int sk_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags,
- long timeo, int *err)
-{
- DEFINE_WAIT_FUNC(wait, woken_wake_function);
- int ret = 0;
-
- if (sk->sk_shutdown & RCV_SHUTDOWN)
- return 1;
-
- if (!timeo)
- return ret;
-
- add_wait_queue(sk_sleep(sk), &wait);
- sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
- ret = sk_wait_event(sk, &timeo,
- !list_empty(&psock->ingress_msg) ||
- !skb_queue_empty(&sk->sk_receive_queue), &wait);
- sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
- remove_wait_queue(sk_sleep(sk), &wait);
- return ret;
-}
-EXPORT_SYMBOL_GPL(sk_msg_wait_data);
-
/* Receive sk_msg from psock->ingress_msg to @msg. */
int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg,
int len, int flags)
@@ -163,6 +163,28 @@ static bool tcp_bpf_stream_read(const struct sock *sk)
return !empty;
}
+static int tcp_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags,
+ long timeo, int *err)
+{
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+ int ret = 0;
+
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ return 1;
+
+ if (!timeo)
+ return ret;
+
+ add_wait_queue(sk_sleep(sk), &wait);
+ sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ ret = sk_wait_event(sk, &timeo,
+ !list_empty(&psock->ingress_msg) ||
+ !skb_queue_empty(&sk->sk_receive_queue), &wait);
+ sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ remove_wait_queue(sk_sleep(sk), &wait);
+ return ret;
+}
+
static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len)
{
@@ -188,7 +210,7 @@ static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
long timeo;
timeo = sock_rcvtimeo(sk, nonblock);
- data = sk_msg_wait_data(sk, psock, flags, timeo, &err);
+ data = tcp_msg_wait_data(sk, psock, flags, timeo, &err);
if (data) {
if (!sk_psock_queue_empty(psock))
goto msg_bytes_ready;
@@ -21,6 +21,45 @@ static int sk_udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
return udp_prot.recvmsg(sk, msg, len, noblock, flags, addr_len);
}
+static bool udp_sk_has_data(struct sock *sk)
+{
+ return !skb_queue_empty(&udp_sk(sk)->reader_queue) ||
+ !skb_queue_empty(&sk->sk_receive_queue);
+}
+
+static bool psock_has_data(struct sk_psock *psock)
+{
+ return !skb_queue_empty(&psock->ingress_skb) ||
+ !sk_psock_queue_empty(psock);
+}
+
+#define udp_msg_has_data(__sk, __psock) \
+ ({ udp_sk_has_data(__sk) || psock_has_data(__psock); })
+
+static int udp_msg_wait_data(struct sock *sk, struct sk_psock *psock, int flags,
+ long timeo, int *err)
+{
+ DEFINE_WAIT_FUNC(wait, woken_wake_function);
+ int ret = 0;
+
+ if (sk->sk_shutdown & RCV_SHUTDOWN)
+ return 1;
+
+ if (!timeo)
+ return ret;
+
+ add_wait_queue(sk_sleep(sk), &wait);
+ sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ ret = udp_msg_has_data(sk, psock);
+ if (!ret) {
+ wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
+ ret = udp_msg_has_data(sk, psock);
+ }
+ sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
+ remove_wait_queue(sk_sleep(sk), &wait);
+ return ret;
+}
+
static int udp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
int nonblock, int flags, int *addr_len)
{
@@ -34,8 +73,7 @@ static int udp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
if (unlikely(!psock))
return sk_udp_recvmsg(sk, msg, len, nonblock, flags, addr_len);
- lock_sock(sk);
- if (sk_psock_queue_empty(psock)) {
+ if (!psock_has_data(psock)) {
ret = sk_udp_recvmsg(sk, msg, len, nonblock, flags, addr_len);
goto out;
}
@@ -47,9 +85,9 @@ static int udp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
long timeo;
timeo = sock_rcvtimeo(sk, nonblock);
- data = sk_msg_wait_data(sk, psock, flags, timeo, &err);
+ data = udp_msg_wait_data(sk, psock, flags, timeo, &err);
if (data) {
- if (!sk_psock_queue_empty(psock))
+ if (psock_has_data(psock))
goto msg_bytes_ready;
ret = sk_udp_recvmsg(sk, msg, len, nonblock, flags, addr_len);
goto out;
@@ -62,7 +100,6 @@ static int udp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len,
}
ret = copied;
out:
- release_sock(sk);
sk_psock_put(sk, psock);
return ret;
}