Condition variables changes:

* Removed left-over commented C implementation.
* It is now possible for a thread to wait for more than one condition
  variable at a time.
* Made waiting for condition variables optionally interruptable.
* Renamed Notify() method to NotifyAll() and added a NotifyOne(), so
  that it is now possible to wake up only one of the waiting threads.
Pretty much untested at the moment.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@22081 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2007-08-26 23:53:12 +00:00
parent 61b50eed27
commit ff308b0b49
5 changed files with 250 additions and 195 deletions

View File

@ -15,22 +15,41 @@
class PrivateConditionVariable; class PrivateConditionVariable;
class PrivateConditionVariableEntry {
protected: struct PrivateConditionVariableEntry {
bool Add(const void* object); public:
void Wait(); inline PrivateConditionVariable* Variable() const
void Wait(const void* object); { return fVariable; }
inline PrivateConditionVariableEntry* ThreadNext() const
{ return fThreadNext; }
class Private;
protected: protected:
PrivateConditionVariableEntry* fNext; bool Add(const void* object,
PrivateConditionVariableEntry* threadNext);
void Wait(uint32 flags);
void Wait(const void* object, uint32 flags);
private:
void _Remove();
protected:
PrivateConditionVariableEntry* fVariableNext;
PrivateConditionVariable* fVariable; PrivateConditionVariable* fVariable;
struct thread* fThread; struct thread* fThread;
uint32 fFlags;
PrivateConditionVariableEntry* fThreadPrevious;
PrivateConditionVariableEntry* fThreadNext;
friend class PrivateConditionVariable; friend class PrivateConditionVariable;
friend class Private;
}; };
struct PrivateConditionVariable class PrivateConditionVariable
: protected HashTableLink<PrivateConditionVariable> { : protected HashTableLink<PrivateConditionVariable> {
public: public:
static void ListAll(); static void ListAll();
@ -40,10 +59,10 @@ protected:
void Publish(const void* object, void Publish(const void* object,
const char* objectType); const char* objectType);
void Unpublish(); void Unpublish();
void Notify(); void Notify(bool all);
private: private:
void _Notify(); void _Notify(bool all);
protected: protected:
const void* fObject; const void* fObject;
@ -62,16 +81,19 @@ public:
const char* objectType); const char* objectType);
inline void Unpublish(); inline void Unpublish();
inline void Notify(); inline void NotifyOne();
inline void NotifyAll();
}; };
template<typename Type> template<typename Type>
class ConditionVariableEntry : private PrivateConditionVariableEntry { class ConditionVariableEntry : public PrivateConditionVariableEntry {
public: public:
inline bool Add(const Type* object); inline bool Add(const Type* object,
inline void Wait(); PrivateConditionVariableEntry* threadNext
inline void Wait(const Type* object); = NULL);
inline void Wait(uint32 flags = 0);
inline void Wait(const Type* object, uint32 flags = 0);
private: private:
bool fAdded; bool fAdded;
@ -96,52 +118,52 @@ ConditionVariable<Type>::Unpublish()
template<typename Type> template<typename Type>
inline void inline void
ConditionVariable<Type>::Notify() ConditionVariable<Type>::NotifyOne()
{ {
PrivateConditionVariable::Notify(); PrivateConditionVariable::Notify(false);
}
template<typename Type>
inline void
ConditionVariable<Type>::NotifyAll()
{
PrivateConditionVariable::Notify(true);
} }
template<typename Type> template<typename Type>
inline bool inline bool
ConditionVariableEntry<Type>::Add(const Type* object) ConditionVariableEntry<Type>::Add(const Type* object,
PrivateConditionVariableEntry* threadNext)
{ {
return PrivateConditionVariableEntry::Add(object); return PrivateConditionVariableEntry::Add(object, threadNext);
} }
template<typename Type> template<typename Type>
inline void inline void
ConditionVariableEntry<Type>::Wait() ConditionVariableEntry<Type>::Wait(uint32 flags)
{ {
PrivateConditionVariableEntry::Wait(); PrivateConditionVariableEntry::Wait(flags);
} }
template<typename Type> template<typename Type>
inline void inline void
ConditionVariableEntry<Type>::Wait(const Type* object) ConditionVariableEntry<Type>::Wait(const Type* object, uint32 flags)
{ {
PrivateConditionVariableEntry::Wait(object); PrivateConditionVariableEntry::Wait(object, flags);
} }
#if 0
extern void publish_stack_condition_variable(condition_variable* variable,
const void* object, const char* objectType);
extern void unpublish_condition_variable(const void* object);
extern void notify_condition_variable(const void* object);
extern void wait_for_condition_variable(const void* object);
extern bool add_condition_variable_entry(const void* object,
condition_variable_entry* entry);
extern void wait_for_condition_variable_entry(
condition_variable_entry* entry);
#endif // 0
extern "C" { extern "C" {
#endif // __cplusplus #endif // __cplusplus
struct thread;
extern status_t condition_variable_interrupt_thread(struct thread* thread);
extern void condition_variable_init(); extern void condition_variable_init();
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -155,7 +155,7 @@ struct thread {
int32 flags; int32 flags;
} sem; } sem;
struct PrivateConditionVariable *condition_variable; struct PrivateConditionVariableEntry *condition_variable_entry;
struct { struct {
sem_id write_sem; sem_id write_sem;

View File

@ -11,6 +11,7 @@
#include <debug.h> #include <debug.h>
#include <kscheduler.h> #include <kscheduler.h>
#include <ksignal.h>
#include <int.h> #include <int.h>
#include <thread.h> #include <thread.h>
#include <util/AutoLock.h> #include <util/AutoLock.h>
@ -89,18 +90,29 @@ dump_condition_variable(int argc, char** argv)
bool bool
PrivateConditionVariableEntry::Add(const void* object) PrivateConditionVariableEntry::Add(const void* object,
PrivateConditionVariableEntry* threadNext)
{ {
ASSERT(object != NULL); ASSERT(object != NULL);
fThread = thread_get_current_thread(); fThread = thread_get_current_thread();
fFlags = 0;
InterruptsLocker _; InterruptsLocker _;
SpinLocker locker(sConditionVariablesLock); SpinLocker locker(sConditionVariablesLock);
// add to the list of entries for this thread
fThreadNext = threadNext;
if (threadNext) {
fThreadPrevious = threadNext->fThreadPrevious;
threadNext->fThreadPrevious = this;
} else
fThreadPrevious = NULL;
// add to the queue for the variable
fVariable = sConditionVariableHash.Lookup(object); fVariable = sConditionVariableHash.Lookup(object);
if (fVariable) { if (fVariable) {
fNext = fVariable->fEntries; fVariableNext = fVariable->fEntries;
fVariable->fEntries = this; fVariable->fEntries = this;
} }
@ -109,7 +121,7 @@ PrivateConditionVariableEntry::Add(const void* object)
void void
PrivateConditionVariableEntry::Wait() PrivateConditionVariableEntry::Wait(uint32 flags)
{ {
if (!are_interrupts_enabled()) { if (!are_interrupts_enabled()) {
panic("wait_for_condition_variable_entry() called with interrupts " panic("wait_for_condition_variable_entry() called with interrupts "
@ -120,28 +132,89 @@ PrivateConditionVariableEntry::Wait()
InterruptsLocker _; InterruptsLocker _;
SpinLocker locker(sConditionVariablesLock); SpinLocker locker(sConditionVariablesLock);
if (fVariable != NULL) { // get first entry for this thread
PrivateConditionVariableEntry* firstEntry = this;
while (firstEntry->fThreadPrevious)
firstEntry = firstEntry->fThreadPrevious;
// check whether any entry has already been notified
// (set the flags while at it)
PrivateConditionVariableEntry* entry = firstEntry;
while (entry) {
if (entry->fVariable == NULL)
return;
entry->fFlags = flags;
entry = entry->fThreadNext;
}
// all entries are unnotified -- wait
struct thread* thread = thread_get_current_thread(); struct thread* thread = thread_get_current_thread();
thread->next_state = B_THREAD_WAITING; thread->next_state = B_THREAD_WAITING;
thread->condition_variable = fVariable; thread->condition_variable_entry = firstEntry;
thread->sem.blocking = -1; thread->sem.blocking = -1;
GRAB_THREAD_LOCK(); GRAB_THREAD_LOCK();
locker.Unlock(); locker.Unlock();
scheduler_reschedule(); scheduler_reschedule();
RELEASE_THREAD_LOCK(); RELEASE_THREAD_LOCK();
}
} }
void void
PrivateConditionVariableEntry::Wait(const void* object) PrivateConditionVariableEntry::Wait(const void* object, uint32 flags)
{ {
if (Add(object)) if (Add(object, NULL))
Wait(object); Wait(flags);
} }
/*! Removes the entry from its variable.
Interrupts must be disabled, sConditionVariablesLock must be held.
*/
void
PrivateConditionVariableEntry::_Remove()
{
if (!fVariable)
return;
// fast path, if we're first in queue
if (this == fVariable->fEntries) {
fVariable->fEntries = fVariableNext;
fVariableNext = NULL;
return;
}
// we're not the first entry -- find our previous entry
PrivateConditionVariableEntry* entry = fVariable->fEntries;
while (entry->fVariableNext) {
if (this == entry->fVariableNext) {
entry->fVariableNext = fVariableNext;
fVariableNext = NULL;
return;
}
entry = entry->fVariableNext;
}
}
class PrivateConditionVariableEntry::Private {
public:
inline Private(PrivateConditionVariableEntry& entry)
: fEntry(entry)
{
}
inline uint32 Flags() const { return fEntry.fFlags; }
inline void Remove() const { fEntry._Remove(); }
private:
PrivateConditionVariableEntry& fEntry;
};
// #pragma mark - PrivateConditionVariable // #pragma mark - PrivateConditionVariable
@ -157,7 +230,7 @@ PrivateConditionVariable::ListAll()
PrivateConditionVariableEntry* entry = variable->fEntries; PrivateConditionVariableEntry* entry = variable->fEntries;
while (entry) { while (entry) {
count++; count++;
entry = entry->fNext; entry = entry->fVariableNext;
} }
kprintf("%p %p %-20s %15d\n", variable, variable->fObject, kprintf("%p %p %-20s %15d\n", variable, variable->fObject,
@ -175,7 +248,7 @@ PrivateConditionVariable::Dump()
PrivateConditionVariableEntry* entry = fEntries; PrivateConditionVariableEntry* entry = fEntries;
while (entry) { while (entry) {
kprintf(" %ld", entry->fThread->id); kprintf(" %ld", entry->fThread->id);
entry = entry->fNext; entry = entry->fVariableNext;
} }
kprintf("\n"); kprintf("\n");
} }
@ -221,12 +294,12 @@ PrivateConditionVariable::Unpublish()
fObjectType = NULL; fObjectType = NULL;
if (fEntries) if (fEntries)
_Notify(); _Notify(true);
} }
void void
PrivateConditionVariable::Notify() PrivateConditionVariable::Notify(bool all)
{ {
ASSERT(fObject != NULL); ASSERT(fObject != NULL);
@ -242,169 +315,105 @@ PrivateConditionVariable::Notify()
#endif #endif
if (fEntries) if (fEntries)
_Notify(); _Notify(all);
} }
//! Called with interrupts disabled and the condition variable spinlock held. //! Called with interrupts disabled and the condition variable spinlock held.
void void
PrivateConditionVariable::_Notify() PrivateConditionVariable::_Notify(bool all)
{ {
// dequeue and wake up the blocked threads // dequeue and wake up the blocked threads
GRAB_THREAD_LOCK(); GRAB_THREAD_LOCK();
while (PrivateConditionVariableEntry* entry = fEntries) { while (PrivateConditionVariableEntry* entry = fEntries) {
fEntries = entry->fNext; fEntries = entry->fVariableNext;
struct thread* thread = entry->fThread;
entry->fNext = NULL; entry->fVariableNext = NULL;
entry->fVariable = NULL; entry->fVariable = NULL;
entry->fThread->condition_variable = NULL; // remove other entries of this thread from their respective variables
if (entry->fThread->state == B_THREAD_WAITING) { PrivateConditionVariableEntry* otherEntry = entry->fThreadPrevious;
entry->fThread->state = B_THREAD_READY; while (otherEntry) {
scheduler_enqueue_in_run_queue(entry->fThread); otherEntry->_Remove();
otherEntry = otherEntry->fThreadPrevious;
} }
otherEntry = entry->fThreadNext;
while (otherEntry) {
otherEntry->_Remove();
otherEntry = otherEntry->fThreadNext;
}
// wake up the thread
thread->condition_variable_entry = NULL;
if (thread->state == B_THREAD_WAITING) {
thread->state = B_THREAD_READY;
scheduler_enqueue_in_run_queue(thread);
}
if (!all)
break;
} }
RELEASE_THREAD_LOCK(); RELEASE_THREAD_LOCK();
} }
#if 0 // #pragma mark -
void
publish_stack_condition_variable(condition_variable* variable, /*! Interrupts must be disabled, thread lock must be held. Note, that the thread
const void* object, const char* objectType) lock may temporarily be released.
*/
status_t
condition_variable_interrupt_thread(struct thread* thread)
{ {
ASSERT(variable != NULL); if (thread == NULL || thread->state != B_THREAD_WAITING
ASSERT(object != NULL); || thread->condition_variable_entry == NULL) {
return B_BAD_VALUE;
variable->object = object;
variable->object_type = objectType;
variable->entries = NULL;
InterruptsLocker _;
SpinLocker locker(sConditionVariablesLock);
ASSERT_PRINT(sConditionVariableHash.Lookup(object) == NULL,
"condition variable: %p\n", sConditionVariableHash.Lookup(object));
sConditionVariableHash.InsertUnchecked(variable);
}
void
unpublish_condition_variable(const void* object)
{
ASSERT(object != NULL);
InterruptsLocker _;
SpinLocker locker(sConditionVariablesLock);
condition_variable* variable = sConditionVariableHash.Lookup(object);
condition_variable_entry* entries = NULL;
if (variable) {
sConditionVariableHash.RemoveUnchecked(variable);
entries = variable->entries;
variable->object = NULL;
variable->object_type = NULL;
variable->entries = NULL;
} else {
panic("No condition variable for %p\n", object);
} }
if (entries) thread_id threadID = thread->id;
notify_condition_variable_entries(entries);
}
// We also need the condition variable spin lock, so, in order to respect
void // the locking order, we must temporarily release the thread lock.
notify_condition_variable(const void* object) RELEASE_THREAD_LOCK();
{
ASSERT(object != NULL);
InterruptsLocker _;
SpinLocker locker(sConditionVariablesLock); SpinLocker locker(sConditionVariablesLock);
condition_variable* variable = sConditionVariableHash.Lookup(object);
condition_variable_entry* entries = NULL;
if (variable) {
entries = variable->entries;
variable->entries = NULL;
} else {
panic("No condition variable for %p\n", object);
}
locker.Unlock();
if (entries)
notify_condition_variable_entries(entries);
}
void
wait_for_condition_variable(const void* object)
{
condition_variable_entry entry;
if (add_condition_variable_entry(object, &entry))
wait_for_condition_variable_entry(&entry);
}
bool
add_condition_variable_entry(const void* object,
condition_variable_entry* entry)
{
ASSERT(object != NULL);
ASSERT(entry != NULL);
entry->thread = thread_get_current_thread();
InterruptsLocker _;
SpinLocker locker(sConditionVariablesLock);
condition_variable* variable = sConditionVariableHash.Lookup(object);
if (variable) {
entry->variable = variable;
entry->next = variable->entries;
variable->entries = entry;
} else
entry->variable = NULL;
return (variable != NULL);
}
void
wait_for_condition_variable_entry(condition_variable_entry* entry)
{
ASSERT(entry != NULL);
if (!are_interrupts_enabled()) {
panic("wait_for_condition_variable_entry() called with interrupts "
"disabled");
return;
}
InterruptsLocker _;
SpinLocker locker(sConditionVariablesLock);
condition_variable* variable = entry->variable;
if (variable != NULL) {
struct thread* thread = thread_get_current_thread();
thread->next_state = B_THREAD_WAITING;
thread->condition_variable = variable;
thread->sem.blocking = -1;
}
if (variable != NULL) {
GRAB_THREAD_LOCK(); GRAB_THREAD_LOCK();
locker.Unlock();
scheduler_reschedule(); // re-get the thread and do the checks again
RELEASE_THREAD_LOCK(); thread = thread_get_thread_struct_locked(threadID);
if (thread != NULL || thread->state != B_THREAD_WAITING
|| thread->condition_variable_entry == NULL) {
return B_BAD_VALUE;
} }
PrivateConditionVariableEntry* entry = thread->condition_variable_entry;
uint32 flags = PrivateConditionVariableEntry::Private(*entry).Flags();
// interruptable?
if ((flags & B_CAN_INTERRUPT) == 0
&& ((flags & B_KILL_CAN_INTERRUPT) == 0
|| (thread->sig_pending & KILL_SIGNALS) == 0)) {
return B_NOT_ALLOWED;
}
// remove all of the thread's entries from their variables
while (entry) {
PrivateConditionVariableEntry::Private(*entry).Remove();
entry = entry->ThreadNext();
}
// wake up the thread
thread->condition_variable_entry = NULL;
thread->state = B_THREAD_READY;
scheduler_enqueue_in_run_queue(thread);
return B_OK;
} }
#endif
void void

View File

@ -10,6 +10,7 @@
#include <OS.h> #include <OS.h>
#include <KernelExport.h> #include <KernelExport.h>
#include <condition_variable.h>
#include <debug.h> #include <debug.h>
#include <kernel.h> #include <kernel.h>
#include <kscheduler.h> #include <kscheduler.h>
@ -221,12 +222,26 @@ is_kill_signal_pending(void)
} }
/** Delivers the \a signal to the \a thread, but doesn't handle the signal - /*! Tries to interrupt a thread waiting for a semaphore or a condition variable.
* it just makes sure the thread gets the signal, ie. unblocks it if needed. Interrupts must be disabled, the thread lock be held. Note, that the thread
* This function must be called with interrupts disabled and the lock may temporarily be released.
* thread lock held. */
*/ static status_t
signal_interrupt_thread(struct thread* thread)
{
if (thread->sem.blocking >= 0)
return sem_interrupt_thread(thread);
else if (thread->condition_variable_entry)
return condition_variable_interrupt_thread(thread);
return B_BAD_VALUE;
}
/*! Delivers the \a signal to the \a thread, but doesn't handle the signal -
it just makes sure the thread gets the signal, ie. unblocks it if needed.
This function must be called with interrupts disabled and the
thread lock held.
*/
static status_t static status_t
deliver_signal(struct thread *thread, uint signal, uint32 flags) deliver_signal(struct thread *thread, uint signal, uint32 flags)
{ {
@ -260,7 +275,7 @@ deliver_signal(struct thread *thread, uint signal, uint32 flags)
mainThread->state = mainThread->next_state = B_THREAD_READY; mainThread->state = mainThread->next_state = B_THREAD_READY;
scheduler_enqueue_in_run_queue(mainThread); scheduler_enqueue_in_run_queue(mainThread);
} else if (mainThread->state == B_THREAD_WAITING) } else if (mainThread->state == B_THREAD_WAITING)
sem_interrupt_thread(mainThread); signal_interrupt_thread(mainThread);
// Supposed to fall through // Supposed to fall through
} }
@ -270,7 +285,7 @@ deliver_signal(struct thread *thread, uint signal, uint32 flags)
thread->state = thread->next_state = B_THREAD_READY; thread->state = thread->next_state = B_THREAD_READY;
scheduler_enqueue_in_run_queue(thread); scheduler_enqueue_in_run_queue(thread);
} else if (thread->state == B_THREAD_WAITING) } else if (thread->state == B_THREAD_WAITING)
sem_interrupt_thread(thread); signal_interrupt_thread(thread);
break; break;
case SIGCONT: case SIGCONT:
@ -289,7 +304,7 @@ deliver_signal(struct thread *thread, uint signal, uint32 flags)
& (~thread->sig_block_mask | SIGNAL_TO_MASK(SIGCHLD))) { & (~thread->sig_block_mask | SIGNAL_TO_MASK(SIGCHLD))) {
// Interrupt thread if it was waiting // Interrupt thread if it was waiting
if (thread->state == B_THREAD_WAITING) if (thread->state == B_THREAD_WAITING)
sem_interrupt_thread(thread); signal_interrupt_thread(thread);
} }
break; break;
} }

View File

@ -11,6 +11,7 @@
#include <OS.h> #include <OS.h>
#include <condition_variable.h>
#include <cpu.h> #include <cpu.h>
#include <int.h> #include <int.h>
#include <kimage.h> #include <kimage.h>
@ -222,7 +223,7 @@ create_thread_struct(struct thread *inthread, const char *name,
thread->team = NULL; thread->team = NULL;
thread->cpu = cpu; thread->cpu = cpu;
thread->sem.blocking = -1; thread->sem.blocking = -1;
thread->condition_variable = NULL; thread->condition_variable_entry = NULL;
thread->fault_handler = 0; thread->fault_handler = 0;
thread->page_faults_allowed = 1; thread->page_faults_allowed = 1;
thread->kernel_stack_area = -1; thread->kernel_stack_area = -1;
@ -871,7 +872,15 @@ _dump_thread_info(struct thread *thread)
kprintf(" sem.count: 0x%lx\n", thread->sem.count); kprintf(" sem.count: 0x%lx\n", thread->sem.count);
kprintf(" sem.acquire_status: 0x%lx\n", thread->sem.acquire_status); kprintf(" sem.acquire_status: 0x%lx\n", thread->sem.acquire_status);
kprintf(" sem.flags: 0x%lx\n", thread->sem.flags); kprintf(" sem.flags: 0x%lx\n", thread->sem.flags);
kprintf("condition variable: %p\n", thread->condition_variable);
kprintf("condition variables:");
PrivateConditionVariableEntry* entry = thread->condition_variable_entry;
while (entry != NULL) {
kprintf(" %p", entry->Variable());
entry = entry->ThreadNext();
}
kprintf("\n");
kprintf("fault_handler: %p\n", (void *)thread->fault_handler); kprintf("fault_handler: %p\n", (void *)thread->fault_handler);
kprintf("args: %p %p\n", thread->args1, thread->args2); kprintf("args: %p %p\n", thread->args1, thread->args2);
kprintf("entry: %p\n", (void *)thread->entry); kprintf("entry: %p\n", (void *)thread->entry);
@ -993,8 +1002,8 @@ dump_thread_list(int argc, char **argv)
// does it block on a semaphore or a condition variable? // does it block on a semaphore or a condition variable?
if (thread->state == B_THREAD_WAITING) { if (thread->state == B_THREAD_WAITING) {
if (thread->condition_variable) if (thread->condition_variable_entry)
kprintf("%p ", thread->condition_variable); kprintf("%p ", thread->condition_variable_entry->Variable());
else else
kprintf("%10lx ", thread->sem.blocking); kprintf("%10lx ", thread->sem.blocking);
} else } else