@@ -178,15 +178,11 @@ static inline void *__ptr_ring_peek(struct ptr_ring *r)
*
* NB: This is only safe to call if ring is never resized.
*
- * However, if some other CPU consumes ring entries at the same time, the value
- * returned is not guaranteed to be correct.
- *
- * In this case - to avoid incorrectly detecting the ring
- * as empty - the CPU consuming the ring entries is responsible
- * for either consuming all ring entries until the ring is empty,
- * or synchronizing with some other CPU and causing it to
- * re-test __ptr_ring_empty and/or consume the ring enteries
- * after the synchronization point.
+ * caller might need to use the smp_rmb() to pair with smp_wmb()
+ * or smp_store_release() in __ptr_ring_discard_one() and smp_wmb()
+ * in __ptr_ring_produce() to ensure correct ordering between
+ * __ptr_ring_empty() checking and subsequent operation after
+ * __ptr_ring_empty() checking.
*
* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax().
@@ -274,7 +270,12 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
if (unlikely(consumer_head >= r->size)) {
r->consumer_tail = 0;
- WRITE_ONCE(r->consumer_head, 0);
+
+ /* Make sure r->queue[0] ~ r->queue[r->consumer_tail]
+ * cleared in previous __ptr_ring_discard_one() is
+ * visible to other cpu.
+ */
+ smp_store_release(&r->consumer_head, 0);
} else {
r->consumer_tail = consumer_head;
WRITE_ONCE(r->consumer_head, consumer_head);
@@ -288,6 +289,14 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
while (likely(--consumer_head >= tail))
r->queue[consumer_head] = NULL;
+ if (unlikely(!r->consumer_head)) {
+ /* Make sure r->queue[r->consumer_tail] ~
+ * r->queue[r->size - 1] cleared above is visible to
+ * other cpu.
+ */
+ smp_wmb();
+ }
+
return;
}
After r->consumer_head is updated in __ptr_ring_discard_one(), r->queue[r->consumer_head] is already cleared in the previous round of __ptr_ring_discard_one(). But there is no guarantee other thread will see the r->queue[r->consumer_head] being NULL because there is no explicit barrier between r->queue[] clearing and r->consumer_head updating. So add two explicit barrier to make sure r->queue[] cleared in __ptr_ring_discard_one() to be visible to other cpu, mainly to make sure the cpu calling the __ptr_ring_empty() will see the correct r->queue[r->consumer_head]. Hopefully the previous and this patch have ensured the correct visibility of r->queue[], so update the comment accordingly about __ptr_ring_empty(). Tested using the "perf stat -r 1000 ./ptr_ring_test -s 1000 -m 1 -N 100000000", comparing the elapsed time: arch unpatched patched improvement arm64 1.888224 sec 1.893673 sec -0.2% X86 2.5422 sec 2.5587 sec -0.6% Reported-by: Michael S. Tsirkin <mst@redhat.com> Signed-off-by: Yunsheng Lin <linyunsheng@huawei.com> --- include/linux/ptr_ring.h | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-)