diff mbox series

[net-next,2/2] net: qrtr: Add support for processing DEL_PROC type control message

Message ID 1693563621-1920-3-git-send-email-quic_srichara@quicinc.com
State New
Headers show
Series net: qrtr: Few qrtr fixes | expand

Commit Message

Sricharan R Sept. 1, 2023, 10:20 a.m. UTC
For certain rproc's like modem, when it goes down and endpoint gets
un-registered, DEL_PROC control message gets forwarded to other
remote nodes. So remote nodes should listen on the message,
wakeup all local waiters waiting for tx_resume notifications
(which will never come) and also forward the message to all
local qrtr sockets like QMI etc. Adding the support here.

Introduced a new rx worker here, because endpoint_post can get called in
atomic contexts, but processing of DEL_PROC needs to acquire node
qrtr_tx mutex.

Signed-off-by: Sricharan Ramabadhran <quic_srichara@quicinc.com>
---
Right now DEL_PROC is sent only by some legacy targets, latest uses
only _BYE signalling for local observers only. So later that needs to
be changed to broadcast and do the same DEL_PROC processing.

 include/uapi/linux/qrtr.h |  1 +
 net/qrtr/af_qrtr.c        | 65 +++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 66 insertions(+)
diff mbox series

Patch

diff --git a/include/uapi/linux/qrtr.h b/include/uapi/linux/qrtr.h
index f7e2fb3..1c92015 100644
--- a/include/uapi/linux/qrtr.h
+++ b/include/uapi/linux/qrtr.h
@@ -26,6 +26,7 @@  enum qrtr_pkt_type {
 	QRTR_TYPE_PING          = 9,
 	QRTR_TYPE_NEW_LOOKUP	= 10,
 	QRTR_TYPE_DEL_LOOKUP	= 11,
+	QRTR_TYPE_DEL_PROC	= 13,
 };
 
 struct qrtr_ctrl_pkt {
diff --git a/net/qrtr/af_qrtr.c b/net/qrtr/af_qrtr.c
index 26197a0..426cea0 100644
--- a/net/qrtr/af_qrtr.c
+++ b/net/qrtr/af_qrtr.c
@@ -3,6 +3,7 @@ 
  * Copyright (c) 2015, Sony Mobile Communications Inc.
  * Copyright (c) 2013, The Linux Foundation. All rights reserved.
  */
+#include <linux/kthread.h>
 #include <linux/module.h>
 #include <linux/netlink.h>
 #include <linux/qrtr.h>
@@ -122,6 +123,9 @@  static DEFINE_XARRAY_ALLOC(qrtr_ports);
  * @qrtr_tx_lock: lock for qrtr_tx_flow inserts
  * @rx_queue: receive queue
  * @item: list item for broadcast list
+ * @kworker: worker thread for recv work
+ * @task: task to run the worker thread
+ * @read_data: scheduled work for recv work
  */
 struct qrtr_node {
 	struct mutex ep_lock;
@@ -134,6 +138,9 @@  struct qrtr_node {
 
 	struct sk_buff_head rx_queue;
 	struct list_head item;
+	struct kthread_worker kworker;
+	struct task_struct *task;
+	struct kthread_work read_data;
 };
 
 /**
@@ -186,6 +193,9 @@  static void __qrtr_node_release(struct kref *kref)
 	list_del(&node->item);
 	mutex_unlock(&qrtr_node_lock);
 
+	kthread_flush_worker(&node->kworker);
+	kthread_stop(node->task);
+
 	skb_queue_purge(&node->rx_queue);
 
 	/* Free tx flow counters */
@@ -526,6 +536,9 @@  int qrtr_endpoint_post(struct qrtr_endpoint *ep, const void *data, size_t len)
 
 	if (cb->type == QRTR_TYPE_RESUME_TX) {
 		qrtr_tx_resume(node, skb);
+	} else if (cb->type == QRTR_TYPE_DEL_PROC) {
+		skb_queue_tail(&node->rx_queue, skb);
+		kthread_queue_work(&node->kworker, &node->read_data);
 	} else {
 		ipc = qrtr_port_lookup(cb->dst_port);
 		if (!ipc)
@@ -574,6 +587,50 @@  static struct sk_buff *qrtr_alloc_ctrl_packet(struct qrtr_ctrl_pkt **pkt,
 	return skb;
 }
 
+/* Handle DEL_PROC control message */
+static void qrtr_node_rx_work(struct kthread_work *work)
+{
+	struct qrtr_node *node = container_of(work, struct qrtr_node,
+					      read_data);
+	struct qrtr_ctrl_pkt *pkt;
+	void __rcu **slot;
+	struct radix_tree_iter iter;
+	struct qrtr_tx_flow *flow;
+	struct sk_buff *skb;
+	struct qrtr_sock *ipc;
+
+	while ((skb = skb_dequeue(&node->rx_queue)) != NULL) {
+		struct qrtr_cb *cb = (struct qrtr_cb *)skb->cb;
+
+		ipc = qrtr_port_lookup(cb->dst_port);
+		if (!ipc) {
+			kfree_skb(skb);
+			continue;
+		}
+
+		if (cb->type == QRTR_TYPE_DEL_PROC) {
+			/* Free tx flow counters */
+			mutex_lock(&node->qrtr_tx_lock);
+			radix_tree_for_each_slot(slot, &node->qrtr_tx_flow, &iter, 0) {
+				flow = *slot;
+				wake_up_interruptible_all(&flow->resume_tx);
+			}
+			mutex_unlock(&node->qrtr_tx_lock);
+
+			/* Translate DEL_PROC to BYE for local enqueue */
+			cb->type = QRTR_TYPE_BYE;
+			pkt = (struct qrtr_ctrl_pkt *)skb->data;
+			memset(pkt, 0, sizeof(*pkt));
+			pkt->cmd = cpu_to_le32(QRTR_TYPE_BYE);
+
+			if (sock_queue_rcv_skb(&ipc->sk, skb))
+				kfree_skb(skb);
+
+			qrtr_port_put(ipc);
+		}
+	}
+}
+
 /**
  * qrtr_endpoint_register() - register a new endpoint
  * @ep: endpoint to register
@@ -599,6 +656,14 @@  int qrtr_endpoint_register(struct qrtr_endpoint *ep, unsigned int nid)
 	node->nid = QRTR_EP_NID_AUTO;
 	node->ep = ep;
 
+	kthread_init_work(&node->read_data, qrtr_node_rx_work);
+	kthread_init_worker(&node->kworker);
+	node->task = kthread_run(kthread_worker_fn, &node->kworker, "qrtr_rx");
+	if (IS_ERR(node->task)) {
+		kfree(node);
+		return -ENOMEM;
+	}
+
 	INIT_RADIX_TREE(&node->qrtr_tx_flow, GFP_KERNEL);
 	mutex_init(&node->qrtr_tx_lock);