Message ID | Y+0W0wgyaJqYHKoj@linutronix.de |
---|---|
State | New |
Headers | show |
Series | [v5] locking/rwbase: Mitigate indefinite writer starvation | expand |
On Wed, Feb 15 2023 at 18:30, Sebastian Andrzej Siewior wrote: > diff --git a/include/linux/rwbase_rt.h b/include/linux/rwbase_rt.h > index 1d264dd086250..b969b1d9bb85c 100644 > --- a/include/linux/rwbase_rt.h > +++ b/include/linux/rwbase_rt.h > @@ -10,12 +10,14 @@ > > struct rwbase_rt { > atomic_t readers; > + unsigned long waiter_timeout; I'm still not convinced that this timeout has any value and if it has then it should be clearly named writer_timeout because that's what it is about. The only reason for this timeout I saw so far is: > +/* > + * Allow reader bias with a pending writer for a minimum of 4ms or 1 tick. This > + * matches RWSEM_WAIT_TIMEOUT for the generic RWSEM implementation. Clearly RT and !RT have completely different implementations and behaviour vs. rwsems and rwlocks. Just because !RT has a timeout does not make a good argument. Just for the record: !RT has the timeout applicable in both directions to prevent writer bias via lock stealing. That's not a problem for RT because? Can we finally get a proper justification for this? > @@ -264,12 +285,20 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb, > if (__rwbase_write_trylock(rwb)) > break; > > + /* > + * Record timeout when reader bias is ignored. Ensure timeout > + * is at least 1 in case of overflow. > + */ > + rwb->waiter_timeout = (jiffies + RWBASE_RT_WAIT_TIMEOUT) | 1; > + So this has two sillies: 1) It resets the timeout once per loop which is plain wrong 2) The "| 1" is really a sloppy hack Why not doing the obvious: static bool __sched rwbase_allow_reader_bias(struct rwbase_rt *rwb) { int r = atomic_read(&rwb->readers); if (likely(r < 0)) return true; if (r == WRITER_BIAS) return false; /* Allow reader bias unless the writer timeout has expired. */ return time_before(jiffies, rwb->writer_timeout); } and with that the "| 1" and all the rwb->timeout = 0 nonsense simply goes away and rwbase_read_lock() becomes: if (rwbase_allow_reader_bias(rwb)) { // fastpath atomic_inc(&rwb->readers); raw_spin_unlock_irq(&rtm->wait_lock); return 0; } // slowpath and the writelock slowpath has: rwb->writer_timeout = jiffies + RWBASE_RT_WAIT_TIMEOUT; for (;;) { .... No? Thanks, tglx
On 2023-02-21 00:55:33 [+0100], Thomas Gleixner wrote: > On Wed, Feb 15 2023 at 18:30, Sebastian Andrzej Siewior wrote: > > diff --git a/include/linux/rwbase_rt.h b/include/linux/rwbase_rt.h > > index 1d264dd086250..b969b1d9bb85c 100644 > > --- a/include/linux/rwbase_rt.h > > +++ b/include/linux/rwbase_rt.h > > @@ -10,12 +10,14 @@ > > > > struct rwbase_rt { > > atomic_t readers; > > + unsigned long waiter_timeout; > > I'm still not convinced that this timeout has any value and if it has > then it should be clearly named writer_timeout because that's what it is > about. > > The only reason for this timeout I saw so far is: > > > +/* > > + * Allow reader bias with a pending writer for a minimum of 4ms or 1 tick. This > > + * matches RWSEM_WAIT_TIMEOUT for the generic RWSEM implementation. > > Clearly RT and !RT have completely different implementations and > behaviour vs. rwsems and rwlocks. Just because !RT has a timeout does > not make a good argument. > > Just for the record: !RT has the timeout applicable in both directions > to prevent writer bias via lock stealing. That's not a problem for RT > because? Once the writer got the lock, then all further reader and writer queue up on rtmutex and will be processed one after the other. It only becomes a problem once a writer with elevated priority acquires the lock repeatedly to the point that tasks with lower priority starve. > Can we finally get a proper justification for this? Avoid writer starvation caused by having at least one reader in the critical section all the time blocking the writer to make any progress. Reader starvation is not an issue because all tasks line up on the rtmutex and will be processed in FIFO order. Tasks with elevated priority will be preferred and can lead starvation of the tasks with lower priority. This is by design and can happen with other lock types, too. > > @@ -264,12 +285,20 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb, > > if (__rwbase_write_trylock(rwb)) > > break; > > > > + /* > > + * Record timeout when reader bias is ignored. Ensure timeout > > + * is at least 1 in case of overflow. > > + */ > > + rwb->waiter_timeout = (jiffies + RWBASE_RT_WAIT_TIMEOUT) | 1; > > + > > So this has two sillies: > > 1) It resets the timeout once per loop which is plain wrong > > 2) The "| 1" is really a sloppy hack > > Why not doing the obvious: Sure. Let me look at this once we agreed on the justification. … > Thanks, > > tglx Sebastian
diff --git a/include/linux/rwbase_rt.h b/include/linux/rwbase_rt.h index 1d264dd086250..b969b1d9bb85c 100644 --- a/include/linux/rwbase_rt.h +++ b/include/linux/rwbase_rt.h @@ -10,12 +10,14 @@ struct rwbase_rt { atomic_t readers; + unsigned long waiter_timeout; struct rt_mutex_base rtmutex; }; #define __RWBASE_INITIALIZER(name) \ { \ .readers = ATOMIC_INIT(READER_BIAS), \ + .waiter_timeout = 0, \ .rtmutex = __RT_MUTEX_BASE_INITIALIZER(name.rtmutex), \ } @@ -23,6 +25,7 @@ struct rwbase_rt { do { \ rt_mutex_base_init(&(rwbase)->rtmutex); \ atomic_set(&(rwbase)->readers, READER_BIAS); \ + (rwbase)->waiter_timeout = 0; \ } while (0) diff --git a/kernel/locking/rwbase_rt.c b/kernel/locking/rwbase_rt.c index c201aadb93017..dfd133672de86 100644 --- a/kernel/locking/rwbase_rt.c +++ b/kernel/locking/rwbase_rt.c @@ -38,8 +38,10 @@ * Implementing the one by one reader boosting/handover mechanism is a * major surgery for a very dubious value. * - * The risk of writer starvation is there, but the pathological use cases - * which trigger it are not necessarily the typical RT workloads. + * Writer starvation is avoided by forcing all reader acquisitions into the slow + * path once the writer is blocked for more than RWBASE_RT_WAIT_TIMEOUT jiffies. + * The writer owns the rtmutex at the time it sets the timeout which guarantees + * that it will be the new lock owner once all active reader leave. * * Fast-path orderings: * The lock/unlock of readers can run in fast paths: lock and unlock are only @@ -65,6 +67,22 @@ static __always_inline int rwbase_read_trylock(struct rwbase_rt *rwb) return 0; } +/* + * Allow reader bias with a pending writer for a minimum of 4ms or 1 tick. This + * matches RWSEM_WAIT_TIMEOUT for the generic RWSEM implementation. + */ +#define RWBASE_RT_WAIT_TIMEOUT DIV_ROUND_UP(HZ, 250) + +static bool __sched rwbase_allow_reader_bias(struct rwbase_rt *rwb) +{ + /* Allow reader bias if no writer is blocked. */ + if (!rwb->waiter_timeout) + return true; + + /* Allow reader bias unless a writer timeout has expired. */ + return time_before(jiffies, rwb->waiter_timeout); +} + static int __sched __rwbase_read_lock(struct rwbase_rt *rwb, unsigned int state) { @@ -74,9 +92,11 @@ static int __sched __rwbase_read_lock(struct rwbase_rt *rwb, raw_spin_lock_irq(&rtm->wait_lock); /* * Allow readers, as long as the writer has not completely - * acquired the semaphore for write. + * acquired the semaphore for write and reader bias is still + * allowed. */ - if (atomic_read(&rwb->readers) != WRITER_BIAS) { + if (atomic_read(&rwb->readers) != WRITER_BIAS && + rwbase_allow_reader_bias(rwb)) { atomic_inc(&rwb->readers); raw_spin_unlock_irq(&rtm->wait_lock); return 0; @@ -255,6 +275,7 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb, for (;;) { /* Optimized out for rwlocks */ if (rwbase_signal_pending_state(state, current)) { + rwb->waiter_timeout = 0; rwbase_restore_current_state(); __rwbase_write_unlock(rwb, 0, flags); trace_contention_end(rwb, -EINTR); @@ -264,12 +285,20 @@ static int __sched rwbase_write_lock(struct rwbase_rt *rwb, if (__rwbase_write_trylock(rwb)) break; + /* + * Record timeout when reader bias is ignored. Ensure timeout + * is at least 1 in case of overflow. + */ + rwb->waiter_timeout = (jiffies + RWBASE_RT_WAIT_TIMEOUT) | 1; + raw_spin_unlock_irqrestore(&rtm->wait_lock, flags); rwbase_schedule(); raw_spin_lock_irqsave(&rtm->wait_lock, flags); set_current_state(state); } + + rwb->waiter_timeout = 0; rwbase_restore_current_state(); trace_contention_end(rwb, 0);