diff mbox series

[v2,6/7] lib/rcu: add least acknowledged token optimization

Message ID 20191008211220.31586-7-honnappa.nagarahalli@arm.com
State New
Headers show
Series typo, doc, simple fixes and some optimizations | expand

Commit Message

Honnappa Nagarahalli Oct. 8, 2019, 9:12 p.m. UTC
When the rte_rcu_qsbr_check API is called, it is possible to
calculate the least valued token acknowledged by all the readers.
When the API is called next time, the readers' token counters do
not need to be scanned if the value of the token being queried is
less than the last least token acknowledged. This avoids the
cache line bounces between readers and writer.

Fixes: 64994b56cfd7 ("rcu: add RCU library supporting QSBR mechanism")
Cc: stable@dpdk.org

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>

Reviewed-by: Gavin Hu <gavin.hu@arm.com>

---
 lib/librte_rcu/rte_rcu_qsbr.c |  4 ++++
 lib/librte_rcu/rte_rcu_qsbr.h | 42 +++++++++++++++++++++++++++++++++++
 2 files changed, 46 insertions(+)

-- 
2.17.1
diff mbox series

Patch

diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c
index ce7f93dd3..c9ca66aaa 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.c
+++ b/lib/librte_rcu/rte_rcu_qsbr.c
@@ -73,6 +73,7 @@  rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
 			__RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
 			__RTE_QSBR_THRID_ARRAY_ELM_SIZE;
 	v->token = __RTE_QSBR_CNT_INIT;
+	v->acked_token = __RTE_QSBR_CNT_INIT - 1;
 
 	return 0;
 }
@@ -245,6 +246,9 @@  rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
 	fprintf(f, "  Token = %"PRIu64"\n",
 			__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
 
+	fprintf(f, "  Least Acknowledged Token = %"PRIu64"\n",
+			__atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE));
+
 	fprintf(f, "Quiescent State Counts for readers:\n");
 	for (i = 0; i < v->num_elems; i++) {
 		bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h
index c80f15c00..3f445ba6c 100644
--- a/lib/librte_rcu/rte_rcu_qsbr.h
+++ b/lib/librte_rcu/rte_rcu_qsbr.h
@@ -83,6 +83,7 @@  struct rte_rcu_qsbr_cnt {
 
 #define __RTE_QSBR_CNT_THR_OFFLINE 0
 #define __RTE_QSBR_CNT_INIT 1
+#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
 
 /* RTE Quiescent State variable structure.
  * This structure has two elements that vary in size based on the
@@ -93,6 +94,10 @@  struct rte_rcu_qsbr_cnt {
 struct rte_rcu_qsbr {
 	uint64_t token __rte_cache_aligned;
 	/**< Counter to allow for multiple concurrent quiescent state queries */
+	uint64_t acked_token;
+	/**< Least token acked by all the threads in the last call to
+	 *   rte_rcu_qsbr_check API.
+	 */
 
 	uint32_t num_elems __rte_cache_aligned;
 	/**< Number of elements in the thread ID array */
@@ -472,6 +477,7 @@  __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 	uint64_t bmap;
 	uint64_t c;
 	uint64_t *reg_thread_id;
+	uint64_t acked_token = __RTE_QSBR_CNT_MAX;
 
 	for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
 		i < v->num_elems;
@@ -493,6 +499,7 @@  __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 			__RTE_RCU_DP_LOG(DEBUG,
 				"%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
 				__func__, t, wait, c, id+j);
+
 			/* Counter is not checked for wrap-around condition
 			 * as it is a 64b counter.
 			 */
@@ -512,10 +519,25 @@  __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 				continue;
 			}
 
+			/* This thread is in quiescent state. Use the counter
+			 * to find the least acknowledged token among all the
+			 * readers.
+			 */
+			if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
+				acked_token = c;
+
 			bmap &= ~(1UL << j);
 		}
 	}
 
+	/* All readers are checked, update least acknowledged token.
+	 * There might be multiple writers trying to update this. There is
+	 * no need to update this very accurately using compare-and-swap.
+	 */
+	if (acked_token != __RTE_QSBR_CNT_MAX)
+		__atomic_store_n(&v->acked_token, acked_token,
+			__ATOMIC_RELAXED);
+
 	return 1;
 }
 
@@ -528,6 +550,7 @@  __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 	uint32_t i;
 	struct rte_rcu_qsbr_cnt *cnt;
 	uint64_t c;
+	uint64_t acked_token = __RTE_QSBR_CNT_MAX;
 
 	for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
 		__RTE_RCU_DP_LOG(DEBUG,
@@ -538,6 +561,7 @@  __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 			__RTE_RCU_DP_LOG(DEBUG,
 				"%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
 				__func__, t, wait, c, i);
+
 			/* Counter is not checked for wrap-around condition
 			 * as it is a 64b counter.
 			 */
@@ -550,8 +574,22 @@  __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 
 			rte_pause();
 		}
+
+		/* This thread is in quiescent state. Use the counter to find
+		 * the least acknowledged token among all the readers.
+		 */
+		if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
+			acked_token = c;
 	}
 
+	/* All readers are checked, update least acknowledged token.
+	 * There might be multiple writers trying to update this. There is
+	 * no need to update this very accurately using compare-and-swap.
+	 */
+	if (acked_token != __RTE_QSBR_CNT_MAX)
+		__atomic_store_n(&v->acked_token, acked_token,
+			__ATOMIC_RELAXED);
+
 	return 1;
 }
 
@@ -595,6 +633,10 @@  rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 {
 	RTE_ASSERT(v != NULL);
 
+	/* Check if all the readers have already acknowledged this token */
+	if (likely(t <= v->acked_token))
+		return 1;
+
 	if (likely(v->num_threads == v->max_threads))
 		return __rte_rcu_qsbr_check_all(v, t, wait);
 	else