@@ -73,6 +73,7 @@ rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
__RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
__RTE_QSBR_THRID_ARRAY_ELM_SIZE;
v->token = __RTE_QSBR_CNT_INIT;
+ v->acked_token = __RTE_QSBR_CNT_INIT - 1;
return 0;
}
@@ -245,6 +246,9 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
fprintf(f, " Token = %"PRIu64"\n",
__atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
+ fprintf(f, " Least Acknowledged Token = %"PRIu64"\n",
+ __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE));
+
fprintf(f, "Quiescent State Counts for readers:\n");
for (i = 0; i < v->num_elems; i++) {
bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
@@ -83,6 +83,7 @@ struct rte_rcu_qsbr_cnt {
#define __RTE_QSBR_CNT_THR_OFFLINE 0
#define __RTE_QSBR_CNT_INIT 1
+#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
/* RTE Quiescent State variable structure.
* This structure has two elements that vary in size based on the
@@ -93,6 +94,10 @@ struct rte_rcu_qsbr_cnt {
struct rte_rcu_qsbr {
uint64_t token __rte_cache_aligned;
/**< Counter to allow for multiple concurrent quiescent state queries */
+ uint64_t acked_token;
+ /**< Least token acked by all the threads in the last call to
+ * rte_rcu_qsbr_check API.
+ */
uint32_t num_elems __rte_cache_aligned;
/**< Number of elements in the thread ID array */
@@ -472,6 +477,7 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
uint64_t bmap;
uint64_t c;
uint64_t *reg_thread_id;
+ uint64_t acked_token = __RTE_QSBR_CNT_MAX;
for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
i < v->num_elems;
@@ -493,6 +499,7 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
__RTE_RCU_DP_LOG(DEBUG,
"%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
__func__, t, wait, c, id+j);
+
/* Counter is not checked for wrap-around condition
* as it is a 64b counter.
*/
@@ -512,10 +519,25 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
continue;
}
+ /* This thread is in quiescent state. Use the counter
+ * to find the least acknowledged token among all the
+ * readers.
+ */
+ if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
+ acked_token = c;
+
bmap &= ~(1UL << j);
}
}
+ /* All readers are checked, update least acknowledged token.
+ * There might be multiple writers trying to update this. There is
+ * no need to update this very accurately using compare-and-swap.
+ */
+ if (acked_token != __RTE_QSBR_CNT_MAX)
+ __atomic_store_n(&v->acked_token, acked_token,
+ __ATOMIC_RELAXED);
+
return 1;
}
@@ -528,6 +550,7 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
uint32_t i;
struct rte_rcu_qsbr_cnt *cnt;
uint64_t c;
+ uint64_t acked_token = __RTE_QSBR_CNT_MAX;
for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
__RTE_RCU_DP_LOG(DEBUG,
@@ -538,6 +561,7 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
__RTE_RCU_DP_LOG(DEBUG,
"%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
__func__, t, wait, c, i);
+
/* Counter is not checked for wrap-around condition
* as it is a 64b counter.
*/
@@ -550,8 +574,22 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
rte_pause();
}
+
+ /* This thread is in quiescent state. Use the counter to find
+ * the least acknowledged token among all the readers.
+ */
+ if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
+ acked_token = c;
}
+ /* All readers are checked, update least acknowledged token.
+ * There might be multiple writers trying to update this. There is
+ * no need to update this very accurately using compare-and-swap.
+ */
+ if (acked_token != __RTE_QSBR_CNT_MAX)
+ __atomic_store_n(&v->acked_token, acked_token,
+ __ATOMIC_RELAXED);
+
return 1;
}
@@ -595,6 +633,10 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
{
RTE_ASSERT(v != NULL);
+ /* Check if all the readers have already acknowledged this token */
+ if (likely(t <= v->acked_token))
+ return 1;
+
if (likely(v->num_threads == v->max_threads))
return __rte_rcu_qsbr_check_all(v, t, wait);
else