Message ID | 1524738868-31318-6-git-send-email-will.deacon@arm.com |
---|---|
State | Accepted |
Commit | 59fb586b4a07b4e1a0ee577140ab4842ba451acd |
Headers | show |
Series | kernel/locking: qspinlock improvements | expand |
On Thu, Apr 26, 2018 at 11:34:19AM +0100, Will Deacon wrote: > @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) > } > > /* > + * If we observe any contention; queue. > + */ > + if (val & ~_Q_LOCKED_MASK) > + goto queue; > + > + /* > * trylock || pending > * > * 0,0,0 -> 0,0,1 ; trylock > * 0,0,1 -> 0,1,1 ; pending > */ > + val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); > + if (!(val & ~_Q_LOCKED_MASK)) { > /* > + * we're pending, wait for the owner to go away. > + * > + * *,1,1 -> *,1,0 Tail must be 0 here, right? > + * > + * this wait loop must be a load-acquire such that we match the > + * store-release that clears the locked bit and create lock > + * sequentiality; this is because not all > + * clear_pending_set_locked() implementations imply full > + * barriers. > */ > + if (val & _Q_LOCKED_MASK) { > + smp_cond_load_acquire(&lock->val.counter, > + !(VAL & _Q_LOCKED_MASK)); > + } > > /* > + * take ownership and clear the pending bit. > + * > + * *,1,0 -> *,0,1 > */ Idem. > + clear_pending_set_locked(lock); > return; > + } > > /* > + * If pending was clear but there are waiters in the queue, then > + * we need to undo our setting of pending before we queue ourselves. > */ > + if (!(val & _Q_PENDING_MASK)) > + clear_pending(lock); This is the branch for when we have !0 tail. > > /* > * End of pending bit optimistic spinning and beginning of MCS > @@ -445,15 +459,15 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) > * claim the lock: > * > * n,0,0 -> 0,0,1 : lock, uncontended > + * *,*,0 -> *,*,1 : lock, contended > * > + * If the queue head is the only one in the queue (lock value == tail) > + * and nobody is pending, clear the tail code and grab the lock. > + * Otherwise, we only need to grab the lock. > */ > for (;;) { > /* In the PV case we might already have _Q_LOCKED_VAL set */ > + if ((val & _Q_TAIL_MASK) != tail || (val & _Q_PENDING_MASK)) { > set_locked(lock); > break; > } This one hunk is terrible on the brain. I'm fairly sure I get it, but I feel that comment can use help. Or at least, I need help reading it. I'll try and cook up something when my brain starts working again.
Hi Peter, On Thu, Apr 26, 2018 at 05:53:35PM +0200, Peter Zijlstra wrote: > On Thu, Apr 26, 2018 at 11:34:19AM +0100, Will Deacon wrote: > > @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) > > } > > > > /* > > + * If we observe any contention; queue. > > + */ > > + if (val & ~_Q_LOCKED_MASK) > > + goto queue; > > + > > + /* > > * trylock || pending > > * > > * 0,0,0 -> 0,0,1 ; trylock > > * 0,0,1 -> 0,1,1 ; pending > > */ > > + val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); > > + if (!(val & ~_Q_LOCKED_MASK)) { > > /* > > + * we're pending, wait for the owner to go away. > > + * > > + * *,1,1 -> *,1,0 > > Tail must be 0 here, right? Not necessarily. If we're concurrently setting pending with another slowpath locker, they could queue in the tail behind us, so we can't mess with those upper bits. > > + * > > + * this wait loop must be a load-acquire such that we match the > > + * store-release that clears the locked bit and create lock > > + * sequentiality; this is because not all > > + * clear_pending_set_locked() implementations imply full > > + * barriers. > > */ > > + if (val & _Q_LOCKED_MASK) { > > + smp_cond_load_acquire(&lock->val.counter, > > + !(VAL & _Q_LOCKED_MASK)); > > + } > > > > /* > > + * take ownership and clear the pending bit. > > + * > > + * *,1,0 -> *,0,1 > > */ > > Idem. Same here, hence why clear_pending_set_locked is either a 16-bit store or an RmW (we can't just clobber the tail with 0). > > + clear_pending_set_locked(lock); > > return; > > + } > > > > /* > > + * If pending was clear but there are waiters in the queue, then > > + * we need to undo our setting of pending before we queue ourselves. > > */ > > + if (!(val & _Q_PENDING_MASK)) > > + clear_pending(lock); > > This is the branch for when we have !0 tail. That's the case where "val" has a !0 tail, but I think the comments are trying to talk about the status of the lockword in memory, no? > > /* > > * End of pending bit optimistic spinning and beginning of MCS > > > @@ -445,15 +459,15 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) > > * claim the lock: > > * > > * n,0,0 -> 0,0,1 : lock, uncontended > > + * *,*,0 -> *,*,1 : lock, contended > > * > > + * If the queue head is the only one in the queue (lock value == tail) > > + * and nobody is pending, clear the tail code and grab the lock. > > + * Otherwise, we only need to grab the lock. > > */ > > for (;;) { > > /* In the PV case we might already have _Q_LOCKED_VAL set */ > > + if ((val & _Q_TAIL_MASK) != tail || (val & _Q_PENDING_MASK)) { > > set_locked(lock); > > break; > > } > > This one hunk is terrible on the brain. I'm fairly sure I get it, but I > feel that comment can use help. Or at least, I need help reading it. > > I'll try and cook up something when my brain starts working again. Cheers. I think the code is a bit easier to read if you look at it after the whole series is applied, but the comments could probably still be improved. Will
On 04/26/2018 06:34 AM, Will Deacon wrote: > The qspinlock locking slowpath utilises a "pending" bit as a simple form > of an embedded test-and-set lock that can avoid the overhead of explicit > queuing in cases where the lock is held but uncontended. This bit is > managed using a cmpxchg loop which tries to transition the uncontended > lock word from (0,0,0) -> (0,0,1) or (0,0,1) -> (0,1,1). > > Unfortunately, the cmpxchg loop is unbounded and lockers can be starved > indefinitely if the lock word is seen to oscillate between unlocked > (0,0,0) and locked (0,0,1). This could happen if concurrent lockers are > able to take the lock in the cmpxchg loop without queuing and pass it > around amongst themselves. > > This patch fixes the problem by unconditionally setting _Q_PENDING_VAL > using atomic_fetch_or, and then inspecting the old value to see whether > we need to spin on the current lock owner, or whether we now effectively > hold the lock. The tricky scenario is when concurrent lockers end up > queuing on the lock and the lock becomes available, causing us to see > a lockword of (n,0,0). With pending now set, simply queuing could lead > to deadlock as the head of the queue may not have observed the pending > flag being cleared. Conversely, if the head of the queue did observe > pending being cleared, then it could transition the lock from (n,0,0) -> > (0,0,1) meaning that any attempt to "undo" our setting of the pending > bit could race with a concurrent locker trying to set it. > > We handle this race by preserving the pending bit when taking the lock > after reaching the head of the queue and leaving the tail entry intact > if we saw pending set, because we know that the tail is going to be > updated shortly. > > Cc: Peter Zijlstra <peterz@infradead.org> > Cc: Ingo Molnar <mingo@kernel.org> > Signed-off-by: Will Deacon <will.deacon@arm.com> > --- > kernel/locking/qspinlock.c | 102 ++++++++++++++++++++---------------- > kernel/locking/qspinlock_paravirt.h | 5 -- > 2 files changed, 58 insertions(+), 49 deletions(-) > > diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c > index a0f7976348f8..ad94b7def005 100644 > --- a/kernel/locking/qspinlock.c > +++ b/kernel/locking/qspinlock.c > @@ -128,6 +128,17 @@ static inline __pure struct mcs_spinlock *decode_tail(u32 tail) > > #if _Q_PENDING_BITS == 8 > /** > + * clear_pending - clear the pending bit. > + * @lock: Pointer to queued spinlock structure > + * > + * *,1,* -> *,0,* > + */ > +static __always_inline void clear_pending(struct qspinlock *lock) > +{ > + WRITE_ONCE(lock->pending, 0); > +} > + > +/** > * clear_pending_set_locked - take ownership and clear the pending bit. > * @lock: Pointer to queued spinlock structure > * > @@ -163,6 +174,17 @@ static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail) > #else /* _Q_PENDING_BITS == 8 */ > > /** > + * clear_pending - clear the pending bit. > + * @lock: Pointer to queued spinlock structure > + * > + * *,1,* -> *,0,* > + */ > +static __always_inline void clear_pending(struct qspinlock *lock) > +{ > + atomic_andnot(_Q_PENDING_VAL, &lock->val); > +} > + > +/** > * clear_pending_set_locked - take ownership and clear the pending bit. > * @lock: Pointer to queued spinlock structure > * > @@ -266,7 +288,7 @@ static __always_inline u32 __pv_wait_head_or_lock(struct qspinlock *lock, > void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) > { > struct mcs_spinlock *prev, *next, *node; > - u32 new, old, tail; > + u32 old, tail; > int idx; > > BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); > @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) > } > > /* > + * If we observe any contention; queue. > + */ > + if (val & ~_Q_LOCKED_MASK) > + goto queue; > + > + /* > * trylock || pending > * > * 0,0,0 -> 0,0,1 ; trylock > * 0,0,1 -> 0,1,1 ; pending > */ > - for (;;) { > + val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); > + if (!(val & ~_Q_LOCKED_MASK)) { > /* > - * If we observe any contention; queue. > + * we're pending, wait for the owner to go away. > + * > + * *,1,1 -> *,1,0 > + * > + * this wait loop must be a load-acquire such that we match the > + * store-release that clears the locked bit and create lock > + * sequentiality; this is because not all > + * clear_pending_set_locked() implementations imply full > + * barriers. > */ > - if (val & ~_Q_LOCKED_MASK) > - goto queue; > - > - new = _Q_LOCKED_VAL; > - if (val == new) > - new |= _Q_PENDING_VAL; > + if (val & _Q_LOCKED_MASK) { > + smp_cond_load_acquire(&lock->val.counter, > + !(VAL & _Q_LOCKED_MASK)); > + } > > /* > - * Acquire semantic is required here as the function may > - * return immediately if the lock was free. > + * take ownership and clear the pending bit. > + * > + * *,1,0 -> *,0,1 > */ > - old = atomic_cmpxchg_acquire(&lock->val, val, new); > - if (old == val) > - break; > - > - val = old; > - } > - > - /* > - * we won the trylock > - */ > - if (new == _Q_LOCKED_VAL) > + clear_pending_set_locked(lock); > return; > + } > > /* > - * we're pending, wait for the owner to go away. > - * > - * *,1,1 -> *,1,0 > - * > - * this wait loop must be a load-acquire such that we match the > - * store-release that clears the locked bit and create lock > - * sequentiality; this is because not all clear_pending_set_locked() > - * implementations imply full barriers. > - */ > - smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_MASK)); > - > - /* > - * take ownership and clear the pending bit. > - * > - * *,1,0 -> *,0,1 > + * If pending was clear but there are waiters in the queue, then > + * we need to undo our setting of pending before we queue ourselves. > */ > - clear_pending_set_locked(lock); > - return; > + if (!(val & _Q_PENDING_MASK)) > + clear_pending(lock); > > /* > * End of pending bit optimistic spinning and beginning of MCS > @@ -445,15 +459,15 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) > * claim the lock: > * > * n,0,0 -> 0,0,1 : lock, uncontended > - * *,0,0 -> *,0,1 : lock, contended > + * *,*,0 -> *,*,1 : lock, contended > * > - * If the queue head is the only one in the queue (lock value == tail), > - * clear the tail code and grab the lock. Otherwise, we only need > - * to grab the lock. > + * If the queue head is the only one in the queue (lock value == tail) > + * and nobody is pending, clear the tail code and grab the lock. > + * Otherwise, we only need to grab the lock. > */ > for (;;) { > /* In the PV case we might already have _Q_LOCKED_VAL set */ > - if ((val & _Q_TAIL_MASK) != tail) { > + if ((val & _Q_TAIL_MASK) != tail || (val & _Q_PENDING_MASK)) { > set_locked(lock); > break; > } > diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h > index 2711940429f5..2dbad2f25480 100644 > --- a/kernel/locking/qspinlock_paravirt.h > +++ b/kernel/locking/qspinlock_paravirt.h > @@ -118,11 +118,6 @@ static __always_inline void set_pending(struct qspinlock *lock) > WRITE_ONCE(lock->pending, 1); > } > > -static __always_inline void clear_pending(struct qspinlock *lock) > -{ > - WRITE_ONCE(lock->pending, 0); > -} > - > /* > * The pending bit check in pv_queued_spin_steal_lock() isn't a memory > * barrier. Therefore, an atomic cmpxchg_acquire() is used to acquire the There is another clear_pending() function after the "#else /* _Q_PENDING_BITS == 8 */" line that need to be removed as well. -Longman
Hi Waiman, On Thu, Apr 26, 2018 at 04:16:30PM -0400, Waiman Long wrote: > On 04/26/2018 06:34 AM, Will Deacon wrote: > > diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h > > index 2711940429f5..2dbad2f25480 100644 > > --- a/kernel/locking/qspinlock_paravirt.h > > +++ b/kernel/locking/qspinlock_paravirt.h > > @@ -118,11 +118,6 @@ static __always_inline void set_pending(struct qspinlock *lock) > > WRITE_ONCE(lock->pending, 1); > > } > > > > -static __always_inline void clear_pending(struct qspinlock *lock) > > -{ > > - WRITE_ONCE(lock->pending, 0); > > -} > > - > > /* > > * The pending bit check in pv_queued_spin_steal_lock() isn't a memory > > * barrier. Therefore, an atomic cmpxchg_acquire() is used to acquire the > > There is another clear_pending() function after the "#else /* > _Q_PENDING_BITS == 8 */" line that need to be removed as well. Bugger, sorry I missed that one. Is the >= 16K CPUs case supported elsewhere in Linux? The x86 Kconfig appears to clamp NR_CPUS to 8192 iiuc. Anyway, additional patch below. Ingo -- please can you apply this on top? Thanks, Will --->8 From ef6aa51e47047fe1a57dfdbe2f45caf63fa95be4 Mon Sep 17 00:00:00 2001 From: Will Deacon <will.deacon@arm.com> Date: Fri, 27 Apr 2018 10:40:13 +0100 Subject: [PATCH] locking/qspinlock: Remove duplicate clear_pending function from PV code The native clear_pending function is identical to the PV version, so the latter can simply be removed. This fixes the build for systems with >= 16K CPUs using the PV lock implementation. Cc: Ingo Molnar <mingo@kernel.org> Cc: Peter Zijlstra <peterz@infradead.org> Reported-by: Waiman Long <longman@redhat.com> Signed-off-by: Will Deacon <will.deacon@arm.com> --- kernel/locking/qspinlock_paravirt.h | 5 ----- 1 file changed, 5 deletions(-) diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h index 25730b2ac022..5a0cf5f9008c 100644 --- a/kernel/locking/qspinlock_paravirt.h +++ b/kernel/locking/qspinlock_paravirt.h @@ -130,11 +130,6 @@ static __always_inline void set_pending(struct qspinlock *lock) atomic_or(_Q_PENDING_VAL, &lock->val); } -static __always_inline void clear_pending(struct qspinlock *lock) -{ - atomic_andnot(_Q_PENDING_VAL, &lock->val); -} - static __always_inline int trylock_clear_pending(struct qspinlock *lock) { int val = atomic_read(&lock->val); -- 2.1.4
On 04/27/2018 06:16 AM, Will Deacon wrote: > Hi Waiman, > > On Thu, Apr 26, 2018 at 04:16:30PM -0400, Waiman Long wrote: >> On 04/26/2018 06:34 AM, Will Deacon wrote: >>> diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h >>> index 2711940429f5..2dbad2f25480 100644 >>> --- a/kernel/locking/qspinlock_paravirt.h >>> +++ b/kernel/locking/qspinlock_paravirt.h >>> @@ -118,11 +118,6 @@ static __always_inline void set_pending(struct qspinlock *lock) >>> WRITE_ONCE(lock->pending, 1); >>> } >>> >>> -static __always_inline void clear_pending(struct qspinlock *lock) >>> -{ >>> - WRITE_ONCE(lock->pending, 0); >>> -} >>> - >>> /* >>> * The pending bit check in pv_queued_spin_steal_lock() isn't a memory >>> * barrier. Therefore, an atomic cmpxchg_acquire() is used to acquire the >> There is another clear_pending() function after the "#else /* >> _Q_PENDING_BITS == 8 */" line that need to be removed as well. > Bugger, sorry I missed that one. Is the >= 16K CPUs case supported elsewhere > in Linux? The x86 Kconfig appears to clamp NR_CPUS to 8192 iiuc. > > Anyway, additional patch below. Ingo -- please can you apply this on top? > I don't think we support >= 16k in any of the distros. However, this will be a limit that we will reach eventually. That is why I said we can wait. Cheers, Longman
On Thu, Apr 26, 2018 at 05:55:19PM +0100, Will Deacon wrote: > Hi Peter, > > On Thu, Apr 26, 2018 at 05:53:35PM +0200, Peter Zijlstra wrote: > > On Thu, Apr 26, 2018 at 11:34:19AM +0100, Will Deacon wrote: > > > @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) > > > } > > > > > > /* > > > + * If we observe any contention; queue. > > > + */ > > > + if (val & ~_Q_LOCKED_MASK) > > > + goto queue; > > > + > > > + /* > > > * trylock || pending > > > * > > > * 0,0,0 -> 0,0,1 ; trylock > > > * 0,0,1 -> 0,1,1 ; pending > > > */ > > > + val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); > > > + if (!(val & ~_Q_LOCKED_MASK)) { > > > /* > > > + * we're pending, wait for the owner to go away. > > > + * > > > + * *,1,1 -> *,1,0 > > > > Tail must be 0 here, right? > > Not necessarily. If we're concurrently setting pending with another slowpath > locker, they could queue in the tail behind us, so we can't mess with those > upper bits. Could be my brain just entirely stopped working; but I read that as: !(val & ~0xFF) := !(val & 0xFFFFFF00) which then pretty much mandates the top bits are empty, no?
On Sat, Apr 28, 2018 at 02:45:37PM +0200, Peter Zijlstra wrote: > On Thu, Apr 26, 2018 at 05:55:19PM +0100, Will Deacon wrote: > > On Thu, Apr 26, 2018 at 05:53:35PM +0200, Peter Zijlstra wrote: > > > On Thu, Apr 26, 2018 at 11:34:19AM +0100, Will Deacon wrote: > > > > @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) > > > > } > > > > > > > > /* > > > > + * If we observe any contention; queue. > > > > + */ > > > > + if (val & ~_Q_LOCKED_MASK) > > > > + goto queue; > > > > + > > > > + /* > > > > * trylock || pending > > > > * > > > > * 0,0,0 -> 0,0,1 ; trylock > > > > * 0,0,1 -> 0,1,1 ; pending > > > > */ > > > > + val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); > > > > + if (!(val & ~_Q_LOCKED_MASK)) { > > > > /* > > > > + * we're pending, wait for the owner to go away. > > > > + * > > > > + * *,1,1 -> *,1,0 > > > > > > Tail must be 0 here, right? > > > > Not necessarily. If we're concurrently setting pending with another slowpath > > locker, they could queue in the tail behind us, so we can't mess with those > > upper bits. > > Could be my brain just entirely stopped working; but I read that as: > > !(val & ~0xFF) := !(val & 0xFFFFFF00) > > which then pretty much mandates the top bits are empty, no? Only if there isn't a concurrent locker. For example: T0: // fastpath fails to acquire the lock, returns val == _Q_LOCKED_VAL if (val & ~_Q_LOCKED_MASK) goto queue; // Fallthrough T1: // fastpath fails to acquire the lock, returns val == _Q_LOCKED_VAL if (val & ~_Q_LOCKED_MASK) goto queue; // Fallthrough T0: val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); // val == _Q_LOCKED_VAL T1: val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); // val == _Q_PENDING_VAL | _Q_LOCKED_VAL // Queue into tail T0: // Spins for _Q_LOCKED_MASK to go to zero, but tail is *non-zero* So it's really down to whether the state transitions in the comments refer to the lockword in memory, or the local "val" variable. I think the former is more instructive, because the whole thing is concurrent. Will
diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c index a0f7976348f8..ad94b7def005 100644 --- a/kernel/locking/qspinlock.c +++ b/kernel/locking/qspinlock.c @@ -128,6 +128,17 @@ static inline __pure struct mcs_spinlock *decode_tail(u32 tail) #if _Q_PENDING_BITS == 8 /** + * clear_pending - clear the pending bit. + * @lock: Pointer to queued spinlock structure + * + * *,1,* -> *,0,* + */ +static __always_inline void clear_pending(struct qspinlock *lock) +{ + WRITE_ONCE(lock->pending, 0); +} + +/** * clear_pending_set_locked - take ownership and clear the pending bit. * @lock: Pointer to queued spinlock structure * @@ -163,6 +174,17 @@ static __always_inline u32 xchg_tail(struct qspinlock *lock, u32 tail) #else /* _Q_PENDING_BITS == 8 */ /** + * clear_pending - clear the pending bit. + * @lock: Pointer to queued spinlock structure + * + * *,1,* -> *,0,* + */ +static __always_inline void clear_pending(struct qspinlock *lock) +{ + atomic_andnot(_Q_PENDING_VAL, &lock->val); +} + +/** * clear_pending_set_locked - take ownership and clear the pending bit. * @lock: Pointer to queued spinlock structure * @@ -266,7 +288,7 @@ static __always_inline u32 __pv_wait_head_or_lock(struct qspinlock *lock, void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) { struct mcs_spinlock *prev, *next, *node; - u32 new, old, tail; + u32 old, tail; int idx; BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS)); @@ -290,58 +312,50 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) } /* + * If we observe any contention; queue. + */ + if (val & ~_Q_LOCKED_MASK) + goto queue; + + /* * trylock || pending * * 0,0,0 -> 0,0,1 ; trylock * 0,0,1 -> 0,1,1 ; pending */ - for (;;) { + val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val); + if (!(val & ~_Q_LOCKED_MASK)) { /* - * If we observe any contention; queue. + * we're pending, wait for the owner to go away. + * + * *,1,1 -> *,1,0 + * + * this wait loop must be a load-acquire such that we match the + * store-release that clears the locked bit and create lock + * sequentiality; this is because not all + * clear_pending_set_locked() implementations imply full + * barriers. */ - if (val & ~_Q_LOCKED_MASK) - goto queue; - - new = _Q_LOCKED_VAL; - if (val == new) - new |= _Q_PENDING_VAL; + if (val & _Q_LOCKED_MASK) { + smp_cond_load_acquire(&lock->val.counter, + !(VAL & _Q_LOCKED_MASK)); + } /* - * Acquire semantic is required here as the function may - * return immediately if the lock was free. + * take ownership and clear the pending bit. + * + * *,1,0 -> *,0,1 */ - old = atomic_cmpxchg_acquire(&lock->val, val, new); - if (old == val) - break; - - val = old; - } - - /* - * we won the trylock - */ - if (new == _Q_LOCKED_VAL) + clear_pending_set_locked(lock); return; + } /* - * we're pending, wait for the owner to go away. - * - * *,1,1 -> *,1,0 - * - * this wait loop must be a load-acquire such that we match the - * store-release that clears the locked bit and create lock - * sequentiality; this is because not all clear_pending_set_locked() - * implementations imply full barriers. - */ - smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_MASK)); - - /* - * take ownership and clear the pending bit. - * - * *,1,0 -> *,0,1 + * If pending was clear but there are waiters in the queue, then + * we need to undo our setting of pending before we queue ourselves. */ - clear_pending_set_locked(lock); - return; + if (!(val & _Q_PENDING_MASK)) + clear_pending(lock); /* * End of pending bit optimistic spinning and beginning of MCS @@ -445,15 +459,15 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) * claim the lock: * * n,0,0 -> 0,0,1 : lock, uncontended - * *,0,0 -> *,0,1 : lock, contended + * *,*,0 -> *,*,1 : lock, contended * - * If the queue head is the only one in the queue (lock value == tail), - * clear the tail code and grab the lock. Otherwise, we only need - * to grab the lock. + * If the queue head is the only one in the queue (lock value == tail) + * and nobody is pending, clear the tail code and grab the lock. + * Otherwise, we only need to grab the lock. */ for (;;) { /* In the PV case we might already have _Q_LOCKED_VAL set */ - if ((val & _Q_TAIL_MASK) != tail) { + if ((val & _Q_TAIL_MASK) != tail || (val & _Q_PENDING_MASK)) { set_locked(lock); break; } diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h index 2711940429f5..2dbad2f25480 100644 --- a/kernel/locking/qspinlock_paravirt.h +++ b/kernel/locking/qspinlock_paravirt.h @@ -118,11 +118,6 @@ static __always_inline void set_pending(struct qspinlock *lock) WRITE_ONCE(lock->pending, 1); } -static __always_inline void clear_pending(struct qspinlock *lock) -{ - WRITE_ONCE(lock->pending, 0); -} - /* * The pending bit check in pv_queued_spin_steal_lock() isn't a memory * barrier. Therefore, an atomic cmpxchg_acquire() is used to acquire the
The qspinlock locking slowpath utilises a "pending" bit as a simple form of an embedded test-and-set lock that can avoid the overhead of explicit queuing in cases where the lock is held but uncontended. This bit is managed using a cmpxchg loop which tries to transition the uncontended lock word from (0,0,0) -> (0,0,1) or (0,0,1) -> (0,1,1). Unfortunately, the cmpxchg loop is unbounded and lockers can be starved indefinitely if the lock word is seen to oscillate between unlocked (0,0,0) and locked (0,0,1). This could happen if concurrent lockers are able to take the lock in the cmpxchg loop without queuing and pass it around amongst themselves. This patch fixes the problem by unconditionally setting _Q_PENDING_VAL using atomic_fetch_or, and then inspecting the old value to see whether we need to spin on the current lock owner, or whether we now effectively hold the lock. The tricky scenario is when concurrent lockers end up queuing on the lock and the lock becomes available, causing us to see a lockword of (n,0,0). With pending now set, simply queuing could lead to deadlock as the head of the queue may not have observed the pending flag being cleared. Conversely, if the head of the queue did observe pending being cleared, then it could transition the lock from (n,0,0) -> (0,0,1) meaning that any attempt to "undo" our setting of the pending bit could race with a concurrent locker trying to set it. We handle this race by preserving the pending bit when taking the lock after reaching the head of the queue and leaving the tail entry intact if we saw pending set, because we know that the tail is going to be updated shortly. Cc: Peter Zijlstra <peterz@infradead.org> Cc: Ingo Molnar <mingo@kernel.org> Signed-off-by: Will Deacon <will.deacon@arm.com> --- kernel/locking/qspinlock.c | 102 ++++++++++++++++++++---------------- kernel/locking/qspinlock_paravirt.h | 5 -- 2 files changed, 58 insertions(+), 49 deletions(-) -- 2.1.4