R/W lock implementation:

* Changed the rw_lock_{read,write}_unlock() return values to void. They
  returned a value != B_OK only in case of user error and no-one checked them
  anyway.
* Optimized rw_lock_read_[un]lock(). They are inline now and as long as
  there's no contending write locker, they will only perform an atomic_add().
* Changed the semantics of nested locking after acquiring a write lock: Read
  and write locks are counted separately, so read locks no longer implicitly
  become write locks. This does e.g. make degrading a write lock to a read
  lock by way of read_lock + write_unlock (as used in the VM) actually work.

These changes speed up the -j8 Haiku image build on my machine by a few
percent, but more interestingly they reduce the total kernel time by 25 %.
Apparently we get more contention on other locks, now.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@34830 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2009-12-31 17:03:41 +00:00
parent 4ccd636dcb
commit 2ea2527fe4
4 changed files with 290 additions and 156 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Copyright 2008-2009, Ingo Weinhold, ingo_weinhold@gmx.de.
* Copyright 2002-2009, Axel Dörfler, axeld@pinc-software.de.
* Distributed under the terms of the MIT License.
*
@ -45,12 +45,21 @@ typedef struct rw_lock {
const char* name;
struct rw_lock_waiter* waiters;
thread_id holder;
int32 reader_count;
int32 writer_count;
vint32 count;
int32 owner_count;
int16 active_readers;
// Only > 0 while a writer is waiting: number
// of active readers when the first waiting
// writer started waiting.
int16 pending_readers;
// Number of readers that have already
// incremented "count", but have not yet started
// to wait at the time the last writer unlocked.
uint32 flags;
} rw_lock;
#define RW_LOCK_WRITER_COUNT_BASE 0x10000
#define RW_LOCK_FLAG_CLONE_NAME 0x1
@ -114,10 +123,8 @@ extern void rw_lock_init(rw_lock* lock, const char* name);
// name is *not* cloned nor freed in rw_lock_destroy()
extern void rw_lock_init_etc(rw_lock* lock, const char* name, uint32 flags);
extern void rw_lock_destroy(rw_lock* lock);
extern status_t rw_lock_read_lock(rw_lock* lock);
extern status_t rw_lock_read_unlock(rw_lock* lock);
extern status_t rw_lock_write_lock(rw_lock* lock);
extern status_t rw_lock_write_unlock(rw_lock* lock);
extern void rw_lock_write_unlock(rw_lock* lock);
extern void mutex_init(mutex* lock, const char* name);
// name is *not* cloned nor freed in mutex_destroy()
@ -129,7 +136,12 @@ extern status_t mutex_switch_lock(mutex* from, mutex* to);
// to, the operation is safe as long as "from" is held while destroying
// "to".
// implementation private:
extern status_t _rw_lock_read_lock(rw_lock* lock);
extern void _rw_lock_read_unlock(rw_lock* lock);
extern status_t _mutex_lock(mutex* lock, bool threadsLocked);
extern void _mutex_unlock(mutex* lock, bool threadsLocked);
extern status_t _mutex_trylock(mutex* lock);
@ -137,6 +149,33 @@ extern status_t _mutex_lock_with_timeout(mutex* lock, uint32 timeoutFlags,
bigtime_t timeout);
static inline status_t
rw_lock_read_lock(rw_lock* lock)
{
#if KDEBUG_RW_LOCK_DEBUG
return rw_lock_write_lock(lock);
#else
int32 oldCount = atomic_add(&lock->count, 1);
if (oldCount >= RW_LOCK_WRITER_COUNT_BASE)
return _rw_lock_read_lock(lock);
return B_OK;
#endif
}
static inline void
rw_lock_read_unlock(rw_lock* lock)
{
#if KDEBUG_RW_LOCK_DEBUG
rw_lock_write_unlock(lock);
#else
int32 oldCount = atomic_add(&lock->count, -1);
if (oldCount >= RW_LOCK_WRITER_COUNT_BASE)
_rw_lock_read_unlock(lock);
#endif
}
static inline status_t
mutex_lock(mutex* lock)
{

View File

@ -1,5 +1,5 @@
/*
* Copyright 2008, Ingo Weinhold, ingo_weinhold@gmx.de.
* Copyright 2008-2009, Ingo Weinhold, ingo_weinhold@gmx.de.
* Copyright 2002-2009, Axel Dörfler, axeld@pinc-software.de. All rights reserved.
* Distributed under the terms of the MIT License.
*
@ -170,43 +170,50 @@ rw_lock_wait(rw_lock* lock, bool writer)
}
static void
static int32
rw_lock_unblock(rw_lock* lock)
{
// Check whether there are any waiting threads at all and whether anyone
// has the write lock.
rw_lock_waiter* waiter = lock->waiters;
if (waiter == NULL || lock->holder > 0)
return;
if (waiter == NULL || lock->holder >= 0)
return 0;
// writer at head of queue?
if (waiter->writer) {
if (lock->reader_count == 0) {
// dequeue writer
lock->waiters = waiter->next;
if (lock->waiters != NULL)
lock->waiters->last = waiter->last;
if (lock->active_readers > 0 || lock->pending_readers > 0)
return 0;
lock->holder = waiter->thread->id;
// dequeue writer
lock->waiters = waiter->next;
if (lock->waiters != NULL)
lock->waiters->last = waiter->last;
// unblock thread
thread_unblock_locked(waiter->thread, B_OK);
}
return;
lock->holder = waiter->thread->id;
// unblock thread
thread_unblock_locked(waiter->thread, B_OK);
return RW_LOCK_WRITER_COUNT_BASE;
}
// wake up one or more readers
while ((waiter = lock->waiters) != NULL && !waiter->writer) {
uint32 readerCount = 0;
do {
// dequeue reader
lock->waiters = waiter->next;
if (lock->waiters != NULL)
lock->waiters->last = waiter->last;
lock->reader_count++;
readerCount++;
// unblock thread
thread_unblock_locked(waiter->thread, B_OK);
}
} while ((waiter = lock->waiters) != NULL && !waiter->writer);
if (lock->count >= RW_LOCK_WRITER_COUNT_BASE)
lock->active_readers += readerCount;
return readerCount;
}
@ -216,9 +223,10 @@ rw_lock_init(rw_lock* lock, const char* name)
lock->name = name;
lock->waiters = NULL;
lock->holder = -1;
lock->reader_count = 0;
lock->writer_count = 0;
lock->count = 0;
lock->owner_count = 0;
lock->active_readers = 0;
lock->pending_readers = 0;
lock->flags = 0;
T_SCHEDULING_ANALYSIS(InitRWLock(lock, name));
@ -232,9 +240,10 @@ rw_lock_init_etc(rw_lock* lock, const char* name, uint32 flags)
lock->name = (flags & RW_LOCK_FLAG_CLONE_NAME) != 0 ? strdup(name) : name;
lock->waiters = NULL;
lock->holder = -1;
lock->reader_count = 0;
lock->writer_count = 0;
lock->count = 0;
lock->owner_count = 0;
lock->active_readers = 0;
lock->pending_readers = 0;
lock->flags = flags & RW_LOCK_FLAG_CLONE_NAME;
T_SCHEDULING_ANALYSIS(InitRWLock(lock, name));
@ -253,7 +262,7 @@ rw_lock_destroy(rw_lock* lock)
#if KDEBUG
if (lock->waiters != NULL && thread_get_current_thread_id()
!= lock->holder) {
!= lock->holder) {
panic("rw_lock_destroy(): there are blocking threads, but the caller "
"doesn't hold the write lock (%p)", lock);
@ -280,89 +289,106 @@ rw_lock_destroy(rw_lock* lock)
}
#if !KDEBUG_RW_LOCK_DEBUG
status_t
rw_lock_read_lock(rw_lock* lock)
_rw_lock_read_lock(rw_lock* lock)
{
#if KDEBUG_RW_LOCK_DEBUG
return rw_lock_write_lock(lock);
#else
InterruptsSpinLocker locker(gThreadSpinlock);
if (lock->writer_count == 0) {
lock->reader_count++;
return B_OK;
}
// We might be the writer ourselves.
if (lock->holder == thread_get_current_thread_id()) {
lock->owner_count++;
return B_OK;
}
return rw_lock_wait(lock, false);
#endif
}
// The writer that originally had the lock when we called atomic_add() might
// already have gone and another writer could have overtaken us. In this
// case the original writer set pending_readers, so we know that we don't
// have to wait.
if (lock->pending_readers > 0) {
lock->pending_readers--;
if (lock->count >= RW_LOCK_WRITER_COUNT_BASE)
lock->active_readers++;
status_t
rw_lock_read_unlock(rw_lock* lock)
{
#if KDEBUG_RW_LOCK_DEBUG
return rw_lock_write_unlock(lock);
#else
InterruptsSpinLocker locker(gThreadSpinlock);
if (lock->holder == thread_get_current_thread_id()) {
if (--lock->owner_count > 0)
return B_OK;
// this originally has been a write lock
lock->writer_count--;
lock->holder = -1;
rw_lock_unblock(lock);
return B_OK;
}
if (lock->reader_count <= 0) {
panic("rw_lock_read_unlock(): lock %p not read-locked", lock);
return B_BAD_VALUE;
ASSERT(lock->count >= RW_LOCK_WRITER_COUNT_BASE);
// we need to wait
return rw_lock_wait(lock, false);
}
void
_rw_lock_read_unlock(rw_lock* lock)
{
InterruptsSpinLocker locker(gThreadSpinlock);
// If we're still holding the write lock or if there are other readers,
// no-one can be woken up.
if (lock->holder == thread_get_current_thread_id()) {
ASSERT(lock->owner_count % RW_LOCK_WRITER_COUNT_BASE > 0);
lock->owner_count--;
return;
}
lock->reader_count--;
if (--lock->active_readers > 0)
return;
if (lock->active_readers < 0) {
panic("rw_lock_read_unlock(): lock %p not read-locked", lock);
lock->active_readers = 0;
return;
}
rw_lock_unblock(lock);
return B_OK;
#endif
}
#endif // !KDEBUG_RW_LOCK_DEBUG
status_t
rw_lock_write_lock(rw_lock* lock)
{
InterruptsSpinLocker locker(gThreadSpinlock);
if (lock->reader_count == 0 && lock->writer_count == 0) {
lock->writer_count++;
lock->holder = thread_get_current_thread_id();
lock->owner_count = 1;
return B_OK;
}
if (lock->holder == thread_get_current_thread_id()) {
lock->owner_count++;
// If we're already the lock holder, we just need to increment the owner
// count.
thread_id thread = thread_get_current_thread_id();
if (lock->holder == thread) {
lock->owner_count += RW_LOCK_WRITER_COUNT_BASE;
return B_OK;
}
lock->writer_count++;
// announce our claim
int32 oldCount = atomic_add(&lock->count, RW_LOCK_WRITER_COUNT_BASE);
if (oldCount == 0) {
// No-one else held a read or write lock, so it's ours now.
lock->holder = thread;
lock->owner_count = RW_LOCK_WRITER_COUNT_BASE;
return B_OK;
}
// We have to wait. If we're the first writer, note the current reader
// count.
if (oldCount < RW_LOCK_WRITER_COUNT_BASE)
lock->active_readers = oldCount - lock->pending_readers;
status_t status = rw_lock_wait(lock, true);
if (status == B_OK) {
lock->holder = thread_get_current_thread_id();
lock->owner_count = 1;
lock->holder = thread;
lock->owner_count = RW_LOCK_WRITER_COUNT_BASE;
}
return status;
}
status_t
void
rw_lock_write_unlock(rw_lock* lock)
{
InterruptsSpinLocker locker(gThreadSpinlock);
@ -370,17 +396,41 @@ rw_lock_write_unlock(rw_lock* lock)
if (thread_get_current_thread_id() != lock->holder) {
panic("rw_lock_write_unlock(): lock %p not write-locked by this thread",
lock);
return B_BAD_VALUE;
return;
}
if (--lock->owner_count > 0)
return B_OK;
lock->writer_count--;
ASSERT(lock->owner_count >= RW_LOCK_WRITER_COUNT_BASE);
lock->owner_count -= RW_LOCK_WRITER_COUNT_BASE;
if (lock->owner_count >= RW_LOCK_WRITER_COUNT_BASE)
return;
// We gave up our last write lock -- clean up and unblock waiters.
int32 readerCount = lock->owner_count;
lock->holder = -1;
lock->owner_count = 0;
rw_lock_unblock(lock);
int32 oldCount = atomic_add(&lock->count, -RW_LOCK_WRITER_COUNT_BASE);
oldCount -= RW_LOCK_WRITER_COUNT_BASE;
return B_OK;
if (oldCount != 0) {
// If writers are waiting, take over our reader count.
if (oldCount >= RW_LOCK_WRITER_COUNT_BASE) {
lock->active_readers = readerCount;
rw_lock_unblock(lock);
} else {
// No waiting writer, but there are one or more readers. We will
// unblock all waiting readers -- that's the easy part -- and must
// also make sure that all readers that haven't entered the critical
// section yet, won't start to wait. Otherwise a writer overtaking
// such a reader will correctly start to wait, but the reader,
// seeing the writer count > 0, would also start to wait. We set
// pending_readers to the number of readers that are still expected
// to enter the critical section.
lock->pending_readers = oldCount - readerCount
- rw_lock_unblock(lock);
}
}
}
@ -402,9 +452,10 @@ dump_rw_lock_info(int argc, char** argv)
kprintf("rw lock %p:\n", lock);
kprintf(" name: %s\n", lock->name);
kprintf(" holder: %ld\n", lock->holder);
kprintf(" reader count: %ld\n", lock->reader_count);
kprintf(" writer count: %ld\n", lock->writer_count);
kprintf(" owner count: %ld\n", lock->owner_count);
kprintf(" count: %#lx\n", lock->count);
kprintf(" active readers %d\n", lock->active_readers);
kprintf(" pending readers %d\n", lock->pending_readers);
kprintf(" owner count: %#lx\n", lock->owner_count);
kprintf(" flags: %#lx\n", lock->flags);
kprintf(" waiting threads:");

View File

@ -292,7 +292,6 @@ AddressSpaceWriteLocker::Unlock()
void
AddressSpaceWriteLocker::DegradeToReadLock()
{
// TODO: the current R/W lock implementation just keeps the write lock here
fSpace->ReadLock();
fSpace->WriteUnlock();
fDegraded = true;

View File

@ -215,43 +215,50 @@ rw_lock_wait(rw_lock* lock, bool writer)
}
static void
static int32
rw_lock_unblock(rw_lock* lock)
{
// Check whether there any waiting threads at all and whether anyone
// has the write lock.
rw_lock_waiter* waiter = lock->waiters;
if (waiter == NULL || lock->holder > 0)
return;
return 0;
// writer at head of queue?
if (waiter->writer) {
if (lock->reader_count == 0) {
// dequeue writer
lock->waiters = waiter->next;
if (lock->waiters != NULL)
lock->waiters->last = waiter->last;
if (lock->active_readers > 0 || lock->pending_readers > 0)
return 0;
lock->holder = get_thread_id(waiter->thread);
// dequeue writer
lock->waiters = waiter->next;
if (lock->waiters != NULL)
lock->waiters->last = waiter->last;
// unblock thread
_kern_unblock_thread(get_thread_id(waiter->thread), B_OK);
}
return;
lock->holder = get_thread_id(waiter->thread);
// unblock thread
_kern_unblock_thread(get_thread_id(waiter->thread), B_OK);
return RW_LOCK_WRITER_COUNT_BASE;
}
// wake up one or more readers
while ((waiter = lock->waiters) != NULL && !waiter->writer) {
uint32 readerCount = 0;
do {
// dequeue reader
lock->waiters = waiter->next;
if (lock->waiters != NULL)
lock->waiters->last = waiter->last;
lock->reader_count++;
readerCount++;
// unblock thread
_kern_unblock_thread(get_thread_id(waiter->thread), B_OK);
}
} while ((waiter = lock->waiters) != NULL && !waiter->writer);
if (lock->count >= RW_LOCK_WRITER_COUNT_BASE)
lock->active_readers += readerCount;
return readerCount;
}
@ -261,9 +268,10 @@ rw_lock_init(rw_lock* lock, const char* name)
lock->name = name;
lock->waiters = NULL;
lock->holder = -1;
lock->reader_count = 0;
lock->writer_count = 0;
lock->count = 0;
lock->owner_count = 0;
lock->active_readers = 0;
lock->pending_readers = 0;
lock->flags = 0;
}
@ -274,9 +282,10 @@ rw_lock_init_etc(rw_lock* lock, const char* name, uint32 flags)
lock->name = (flags & RW_LOCK_FLAG_CLONE_NAME) != 0 ? strdup(name) : name;
lock->waiters = NULL;
lock->holder = -1;
lock->reader_count = 0;
lock->writer_count = 0;
lock->count = 0;
lock->owner_count = 0;
lock->active_readers = 0;
lock->pending_readers = 0;
lock->flags = flags & RW_LOCK_FLAG_CLONE_NAME;
}
@ -292,7 +301,7 @@ rw_lock_destroy(rw_lock* lock)
#if KDEBUG
if (lock->waiters != NULL && find_thread(NULL)
!= lock->holder) {
!= lock->holder) {
panic("rw_lock_destroy(): there are blocking threads, but the caller "
"doesn't hold the write lock (%p)", lock);
@ -319,89 +328,103 @@ rw_lock_destroy(rw_lock* lock)
}
#if !KDEBUG_RW_LOCK_DEBUG
status_t
rw_lock_read_lock(rw_lock* lock)
_rw_lock_read_lock(rw_lock* lock)
{
#if KDEBUG_RW_LOCK_DEBUG
return rw_lock_write_lock(lock);
#else
AutoLocker<ThreadSpinlock> locker(sThreadSpinlock);
if (lock->writer_count == 0) {
lock->reader_count++;
return B_OK;
}
// We might be the writer ourselves.
if (lock->holder == find_thread(NULL)) {
lock->owner_count++;
return B_OK;
}
return rw_lock_wait(lock, false);
#endif
}
// The writer that originally had the lock when we called atomic_add() might
// already have gone and another writer could have overtaken us. In this
// case the original writer set pending_readers, so we know that we don't
// have to wait.
if (lock->pending_readers > 0) {
lock->pending_readers--;
if (lock->count >= RW_LOCK_WRITER_COUNT_BASE)
lock->active_readers++;
status_t
rw_lock_read_unlock(rw_lock* lock)
{
#if KDEBUG_RW_LOCK_DEBUG
return rw_lock_write_unlock(lock);
#else
AutoLocker<ThreadSpinlock> locker(sThreadSpinlock);
if (lock->holder == find_thread(NULL)) {
if (--lock->owner_count > 0)
return B_OK;
// this originally has been a write lock
lock->writer_count--;
lock->holder = -1;
rw_lock_unblock(lock);
return B_OK;
}
if (lock->reader_count <= 0) {
panic("rw_lock_read_unlock(): lock %p not read-locked", lock);
return B_BAD_VALUE;
// we need to wait
return rw_lock_wait(lock, false);
}
void
_rw_lock_read_unlock(rw_lock* lock)
{
AutoLocker<ThreadSpinlock> locker(sThreadSpinlock);
// If we're still holding the write lock or if there are other readers,
// no-one can be woken up.
if (lock->holder == find_thread(NULL)) {
lock->owner_count--;
return;
}
lock->reader_count--;
if (--lock->active_readers > 0)
return;
if (lock->active_readers < 0) {
panic("rw_lock_read_unlock(): lock %p not read-locked", lock);
lock->active_readers = 0;
return;
}
rw_lock_unblock(lock);
return B_OK;
#endif
}
#endif // !KDEBUG_RW_LOCK_DEBUG
status_t
rw_lock_write_lock(rw_lock* lock)
{
AutoLocker<ThreadSpinlock> locker(sThreadSpinlock);
if (lock->reader_count == 0 && lock->writer_count == 0) {
lock->writer_count++;
lock->holder = find_thread(NULL);
lock->owner_count = 1;
return B_OK;
}
if (lock->holder == find_thread(NULL)) {
lock->owner_count++;
// If we're already the lock holder, we just need to increment the owner
// count.
thread_id thread = find_thread(NULL);
if (lock->holder == thread) {
lock->owner_count += RW_LOCK_WRITER_COUNT_BASE;
return B_OK;
}
lock->writer_count++;
// announce our claim
int32 oldCount = atomic_add(&lock->count, RW_LOCK_WRITER_COUNT_BASE);
if (oldCount == 0) {
// No-one else held a read or write lock, so it's ours now.
lock->holder = thread;
lock->owner_count = RW_LOCK_WRITER_COUNT_BASE;
return B_OK;
}
// We have to wait. If we're the first writer, note the current reader
// count.
if (oldCount < RW_LOCK_WRITER_COUNT_BASE)
lock->active_readers = oldCount - lock->pending_readers;
status_t status = rw_lock_wait(lock, true);
if (status == B_OK) {
lock->holder = find_thread(NULL);
lock->owner_count = 1;
lock->holder = thread;
lock->owner_count = RW_LOCK_WRITER_COUNT_BASE;
}
return status;
}
status_t
void
rw_lock_write_unlock(rw_lock* lock)
{
AutoLocker<ThreadSpinlock> locker(sThreadSpinlock);
@ -409,17 +432,39 @@ rw_lock_write_unlock(rw_lock* lock)
if (find_thread(NULL) != lock->holder) {
panic("rw_lock_write_unlock(): lock %p not write-locked by this thread",
lock);
return B_BAD_VALUE;
return;
}
if (--lock->owner_count > 0)
return B_OK;
lock->writer_count--;
lock->owner_count -= RW_LOCK_WRITER_COUNT_BASE;
if (lock->owner_count >= RW_LOCK_WRITER_COUNT_BASE)
return;
// We gave up our last write lock -- clean up and unblock waiters.
int32 readerCount = lock->owner_count;
lock->holder = -1;
lock->owner_count = 0;
rw_lock_unblock(lock);
int32 oldCount = atomic_add(&lock->count, -RW_LOCK_WRITER_COUNT_BASE);
oldCount -= RW_LOCK_WRITER_COUNT_BASE;
return B_OK;
if (oldCount != 0) {
// If writers are waiting, take over our reader count.
if (oldCount >= RW_LOCK_WRITER_COUNT_BASE) {
lock->active_readers = readerCount;
rw_lock_unblock(lock);
} else {
// No waiting writer, but there are one or more readers. We will
// unblock all waiting readers -- that's the easy part -- and must
// also make sure that all readers that haven't entered the critical
// section yet, won't start to wait. Otherwise a writer overtaking
// such a reader will correctly start to wait, but the reader,
// seeing the writer count > 0, would also start to wait. We set
// pending_readers to the number of readers that are still expected
// to enter the critical section.
lock->pending_readers = oldCount - readerCount
- rw_lock_unblock(lock);
}
}
}