Reimplement unnamed POSIX semaphores using user_mutex

* Fixes sharing semantics, so non-shared semaphores in non-shared
  memory do not become shared after a fork.
* Adds two new system calls: _user_mutex_sem_acquire/release(),
  which reuse the user_mutex address-hashed wait mechanism.
* Named semaphores continue to use traditional sem_id semaphores.
This commit is contained in:
Hamish Morrison 2015-05-11 21:14:52 +01:00
parent 03796a0cef
commit d6d439f3f7
6 changed files with 286 additions and 291 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2008-2012 Haiku, Inc.
* Copyright 2008-2015 Haiku, Inc.
* Distributed under the terms of the MIT License.
*/
#ifndef _SEMAPHORE_H_
@ -13,8 +13,12 @@
typedef struct _sem_t {
int32_t id;
int32_t _padding[3];
int32_t type;
union {
int32_t named_sem_id;
int32_t unnamed_sem;
} u;
int32_t padding[2];
} sem_t;
#define SEM_FAILED ((sem_t*)(long)-1)

View File

@ -20,6 +20,9 @@ status_t _user_mutex_lock(int32* mutex, const char* name, uint32 flags,
status_t _user_mutex_unlock(int32* mutex, uint32 flags);
status_t _user_mutex_switch_lock(int32* fromMutex, int32* toMutex,
const char* name, uint32 flags, bigtime_t timeout);
status_t _user_mutex_sem_acquire(int32* sem, const char* name, uint32 flags,
bigtime_t timeout);
status_t _user_mutex_sem_release(int32* sem);
#ifdef __cplusplus
}

View File

@ -79,6 +79,9 @@ extern status_t _kern_mutex_lock(int32* mutex, const char* name,
extern status_t _kern_mutex_unlock(int32* mutex, uint32 flags);
extern status_t _kern_mutex_switch_lock(int32* fromMutex, int32* toMutex,
const char* name, uint32 flags, bigtime_t timeout);
extern status_t _kern_mutex_sem_acquire(int32* sem, const char* name,
uint32 flags, bigtime_t timeout);
extern status_t _kern_mutex_sem_release(int32* sem);
/* sem functions */
extern sem_id _kern_create_sem(int count, const char *name);

View File

@ -1,4 +1,5 @@
/*
* Copyright 2015, Hamish Morrison, hamishm53@gmail.com.
* Copyright 2010, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
@ -99,23 +100,9 @@ remove_user_mutex_entry(UserMutexEntry* entry)
static status_t
user_mutex_lock_locked(int32* mutex, addr_t physicalAddress, const char* name,
uint32 flags, bigtime_t timeout, MutexLocker& locker)
user_mutex_wait_locked(int32* mutex, addr_t physicalAddress, const char* name,
uint32 flags, bigtime_t timeout, MutexLocker& locker, bool& lastWaiter)
{
// mark the mutex locked + waiting
int32 oldValue = atomic_or(mutex,
B_USER_MUTEX_LOCKED | B_USER_MUTEX_WAITING);
// The mutex might have been unlocked (or disabled) in the meantime.
if ((oldValue & (B_USER_MUTEX_LOCKED | B_USER_MUTEX_WAITING)) == 0
|| (oldValue & B_USER_MUTEX_DISABLED) != 0) {
// clear the waiting flag and be done
atomic_and(mutex, ~(int32)B_USER_MUTEX_WAITING);
return B_OK;
}
// we have to wait
// add the entry to the table
UserMutexEntry entry;
entry.address = physicalAddress;
@ -131,18 +118,44 @@ user_mutex_lock_locked(int32* mutex, addr_t physicalAddress, const char* name,
status_t error = waitEntry.Wait(flags, timeout);
locker.Lock();
// dequeue if we weren't woken up
if (!entry.locked && !remove_user_mutex_entry(&entry)) {
// no one is waiting anymore -- clear the waiting flag
atomic_and(mutex, ~(int32)B_USER_MUTEX_WAITING);
if (error != B_OK && entry.locked)
error = B_OK;
if (!entry.locked) {
// if nobody woke us up, we have to dequeue ourselves
lastWaiter = !remove_user_mutex_entry(&entry);
} else {
// otherwise the waker has done the work of marking the
// mutex or semaphore uncontended
lastWaiter = false;
}
if (error != B_OK
&& (entry.locked || (*mutex & B_USER_MUTEX_DISABLED) != 0)) {
// timeout or interrupt, but the mutex was unlocked or disabled in time
error = B_OK;
return error;
}
static status_t
user_mutex_lock_locked(int32* mutex, addr_t physicalAddress,
const char* name, uint32 flags, bigtime_t timeout, MutexLocker& locker)
{
// mark the mutex locked + waiting
int32 oldValue = atomic_or(mutex,
B_USER_MUTEX_LOCKED | B_USER_MUTEX_WAITING);
if ((oldValue & (B_USER_MUTEX_LOCKED | B_USER_MUTEX_WAITING)) == 0
|| (oldValue & B_USER_MUTEX_DISABLED) != 0) {
// clear the waiting flag and be done
atomic_and(mutex, ~(int32)B_USER_MUTEX_WAITING);
return B_OK;
}
bool lastWaiter;
status_t error = user_mutex_wait_locked(mutex, physicalAddress, name,
flags, timeout, locker, lastWaiter);
if (lastWaiter)
atomic_and(mutex, ~(int32)B_USER_MUTEX_WAITING);
return error;
}
@ -185,6 +198,59 @@ user_mutex_unlock_locked(int32* mutex, addr_t physicalAddress, uint32 flags)
}
static status_t
user_mutex_sem_acquire_locked(int32* sem, addr_t physicalAddress,
const char* name, uint32 flags, bigtime_t timeout, MutexLocker& locker)
{
// The semaphore may have been released in the meantime, and we also
// need to mark it as contended if it isn't already.
int32 oldValue = atomic_get(sem);
while (oldValue > -1) {
int32 value = atomic_test_and_set(sem, oldValue - 1, oldValue);
if (value == oldValue && value > 0)
return B_OK;
oldValue = value;
}
bool lastWaiter;
status_t error = user_mutex_wait_locked(sem, physicalAddress, name, flags,
timeout, locker, lastWaiter);
if (lastWaiter)
atomic_test_and_set(sem, 0, -1);
return error;
}
static void
user_mutex_sem_release_locked(int32* sem, addr_t physicalAddress)
{
UserMutexEntry* entry = sUserMutexTable.Lookup(physicalAddress);
if (!entry) {
// no waiters - mark as uncontended and release
int32 oldValue = atomic_get(sem);
while (true) {
int32 inc = oldValue < 0 ? 2 : 1;
int32 value = atomic_test_and_set(sem, oldValue + inc, oldValue);
if (value == oldValue)
return;
oldValue = value;
}
}
bool otherWaiters = remove_user_mutex_entry(entry);
entry->locked = true;
entry->condition.NotifyOne();
if (!otherWaiters) {
// mark the semaphore uncontended
atomic_test_and_set(sem, 0, -1);
}
}
static status_t
user_mutex_lock(int32* mutex, const char* name, uint32 flags, bigtime_t timeout)
{
@ -312,3 +378,53 @@ _user_mutex_switch_lock(int32* fromMutex, int32* toMutex, const char* name,
return user_mutex_switch_lock(fromMutex, toMutex, name,
flags | B_CAN_INTERRUPT, timeout);
}
status_t
_user_mutex_sem_acquire(int32* sem, const char* name, uint32 flags,
bigtime_t timeout)
{
if (sem == NULL || !IS_USER_ADDRESS(sem) || (addr_t)sem % 4 != 0)
return B_BAD_ADDRESS;
syscall_restart_handle_timeout_pre(flags, timeout);
// wire the page and get the physical address
VMPageWiringInfo wiringInfo;
status_t error = vm_wire_page(B_CURRENT_TEAM, (addr_t)sem, true,
&wiringInfo);
if (error != B_OK)
return error;
{
MutexLocker locker(sUserMutexTableLock);
error = user_mutex_sem_acquire_locked(sem, wiringInfo.physicalAddress,
name, flags | B_CAN_INTERRUPT, timeout, locker);
}
vm_unwire_page(&wiringInfo);
return syscall_restart_handle_timeout_post(error, timeout);
}
status_t
_user_mutex_sem_release(int32* sem)
{
if (sem == NULL || !IS_USER_ADDRESS(sem) || (addr_t)sem % 4 != 0)
return B_BAD_ADDRESS;
// wire the page and get the physical address
VMPageWiringInfo wiringInfo;
status_t error = vm_wire_page(B_CURRENT_TEAM, (addr_t)sem, true,
&wiringInfo);
if (error != B_OK)
return error;
{
MutexLocker locker(sUserMutexTableLock);
user_mutex_sem_release_locked(sem, wiringInfo.physicalAddress);
}
vm_unwire_page(&wiringInfo);
return B_OK;
}

View File

@ -150,104 +150,6 @@ private:
};
class UnnamedSem : public SemInfo {
public:
UnnamedSem()
:
fID(0)
{
}
virtual ~UnnamedSem()
{
}
status_t Init(int32 semCount, const char* name)
{
return SemInfo::Init(semCount, name);
}
void SetID(sem_id id)
{
fID = id;
}
virtual sem_id ID() const
{
return fID;
}
virtual SemInfo* Clone()
{
sem_info info;
if (get_sem_info(SemaphoreID(), &info) != B_OK)
return NULL;
UnnamedSem* clone = new(std::nothrow) UnnamedSem;
if (clone == NULL)
return NULL;
if (clone->Init(info.count, info.name) != B_OK) {
delete clone;
return NULL;
}
clone->SetID(fID);
return clone;
}
virtual void Delete()
{
delete this;
}
private:
sem_id fID;
};
class UnnamedSharedSem : public SemInfo {
public:
UnnamedSharedSem()
{
}
virtual ~UnnamedSharedSem()
{
}
status_t Init(int32 semCount, const char* name)
{
return SemInfo::Init(semCount, name);
}
virtual sem_id ID() const
{
return SemaphoreID();
}
virtual SemInfo* Clone()
{
// Can't be cloned.
return NULL;
}
virtual void Delete()
{
delete this;
}
UnnamedSharedSem*& HashLink()
{
return fHashLink;
}
private:
UnnamedSharedSem* fHashLink;
};
struct NamedSemHashDefinition {
typedef const char* KeyType;
typedef NamedSem ValueType;
@ -274,32 +176,6 @@ struct NamedSemHashDefinition {
};
struct UnnamedSemHashDefinition {
typedef sem_id KeyType;
typedef UnnamedSharedSem ValueType;
size_t HashKey(const KeyType& key) const
{
return (size_t)key;
}
size_t Hash(UnnamedSharedSem* semaphore) const
{
return HashKey(semaphore->SemaphoreID());
}
bool Compare(const KeyType& key, UnnamedSharedSem* semaphore) const
{
return key == semaphore->SemaphoreID();
}
UnnamedSharedSem*& GetLink(UnnamedSharedSem* semaphore) const
{
return semaphore->HashLink();
}
};
class GlobalSemTable {
public:
GlobalSemTable()
@ -316,10 +192,7 @@ public:
status_t Init()
{
status_t error = fNamedSemaphores.Init();
if (error != B_OK)
return error;
return fUnnamedSemaphores.Init();
return fNamedSemaphores.Init();
}
status_t OpenNamedSem(const char* name, int openFlags, mode_t mode,
@ -393,61 +266,11 @@ public:
return B_OK;
}
status_t CreateUnnamedSem(uint32 semCount, int32_t& _id)
{
MutexLocker _(fLock);
if (fSemaphoreCount >= MAX_POSIX_SEMS)
return ENOSPC;
UnnamedSharedSem* sem = new(std::nothrow) UnnamedSharedSem;
if (sem == NULL)
return B_NO_MEMORY;
status_t error = sem->Init(semCount, "unnamed shared sem");
if (error == B_OK)
error = fUnnamedSemaphores.Insert(sem);
if (error != B_OK) {
delete sem;
return error;
}
fSemaphoreCount++;
_id = sem->SemaphoreID();
return B_OK;
}
status_t DeleteUnnamedSem(sem_id id)
{
MutexLocker _(fLock);
UnnamedSharedSem* sem = fUnnamedSemaphores.Lookup(id);
if (sem == NULL)
return B_BAD_VALUE;
fUnnamedSemaphores.Remove(sem);
delete sem;
fSemaphoreCount--;
return B_OK;
}
bool IsUnnamedValidSem(sem_id id)
{
MutexLocker _(fLock);
return fUnnamedSemaphores.Lookup(id) != NULL;
}
private:
typedef BOpenHashTable<NamedSemHashDefinition, true> NamedSemTable;
typedef BOpenHashTable<UnnamedSemHashDefinition, true> UnnamedSemTable;
mutex fLock;
NamedSemTable fNamedSemaphores;
UnnamedSemTable fUnnamedSemaphores;
int32 fSemaphoreCount;
};
@ -602,46 +425,6 @@ struct realtime_sem_context {
return context;
}
status_t CreateUnnamedSem(uint32 semCount, bool shared, int32_t& _id)
{
if (shared)
return sSemTable.CreateUnnamedSem(semCount, _id);
UnnamedSem* sem = new(std::nothrow) UnnamedSem;
if (sem == NULL)
return B_NO_MEMORY;
ObjectDeleter<UnnamedSem> semDeleter(sem);
status_t error = sem->Init(semCount, "unnamed sem");
if (error != B_OK)
return error;
TeamSemInfo* teamSem = new(std::nothrow) TeamSemInfo(sem, NULL);
if (teamSem == NULL)
return B_NO_MEMORY;
semDeleter.Detach();
MutexLocker _(fLock);
if (fSemaphoreCount >= MAX_POSIX_SEMS_PER_TEAM) {
delete teamSem;
return ENOSPC;
}
sem->SetID(_NextPrivateSemID());
error = fSemaphores.Insert(teamSem);
if (error != B_OK) {
delete teamSem;
return error;
}
fSemaphoreCount++;
_id = teamSem->ID();
return B_OK;
}
status_t OpenSem(const char* name, int openFlags, mode_t mode,
uint32 semCount, sem_t* userSem, sem_t*& _usedUserSem, int32_t& _id,
bool& _created)
@ -706,7 +489,7 @@ struct realtime_sem_context {
TeamSemInfo* sem = fSemaphores.Lookup(id);
if (sem == NULL)
return sSemTable.DeleteUnnamedSem(id);
return B_BAD_VALUE;
if (sem->Close()) {
// last reference closed
@ -724,10 +507,9 @@ struct realtime_sem_context {
MutexLocker locker(fLock);
TeamSemInfo* sem = fSemaphores.Lookup(id);
if (sem == NULL) {
if (!sSemTable.IsUnnamedValidSem(id))
return B_BAD_VALUE;
} else
if (sem == NULL)
return B_BAD_VALUE;
else
id = sem->SemaphoreID();
locker.Unlock();
@ -751,10 +533,9 @@ struct realtime_sem_context {
MutexLocker locker(fLock);
TeamSemInfo* sem = fSemaphores.Lookup(id);
if (sem == NULL) {
if (!sSemTable.IsUnnamedValidSem(id))
return B_BAD_VALUE;
} else
if (sem == NULL)
return B_BAD_VALUE;
else
id = sem->SemaphoreID();
locker.Unlock();
@ -768,10 +549,9 @@ struct realtime_sem_context {
MutexLocker locker(fLock);
TeamSemInfo* sem = fSemaphores.Lookup(id);
if (sem == NULL) {
if (!sSemTable.IsUnnamedValidSem(id))
if (sem == NULL)
return B_BAD_VALUE;
} else
else
id = sem->SemaphoreID();
locker.Unlock();
@ -914,23 +694,6 @@ _user_realtime_sem_open(const char* userName, int openFlagsOrShared,
if (!IS_USER_ADDRESS(userSem))
return B_BAD_ADDRESS;
// unnamed semaphores are less work -- deal with them first
if (userName == NULL) {
int32_t id;
status_t error = context->CreateUnnamedSem(semCount, openFlagsOrShared,
id);
if (error != B_OK)
return error;
if (user_memcpy(&userSem->id, &id, sizeof(int)) != B_OK) {
sem_t* dummy;
context->CloseSem(id, dummy);
return B_BAD_ADDRESS;
}
return B_OK;
}
// check user pointers
if (_usedUserSem == NULL)
return B_BAD_VALUE;
@ -954,7 +717,7 @@ _user_realtime_sem_open(const char* userName, int openFlagsOrShared,
return error;
// copy results back to userland
if (user_memcpy(&userSem->id, &id, sizeof(int)) != B_OK
if (user_memcpy(&userSem->u.named_sem_id, &id, sizeof(int32_t)) != B_OK
|| user_memcpy(_usedUserSem, &usedUserSem, sizeof(sem_t*)) != B_OK) {
if (created)
sSemTable.UnlinkNamedSem(name);

View File

@ -1,4 +1,5 @@
/*
* Copyright 2015, Hamish Morrison, hamishm53@gmail.com.
* Copyright 2008-2011, Ingo Weinhold, ingo_weinhold@gmx.de.
* Distributed under the terms of the MIT License.
*/
@ -17,6 +18,25 @@
#include <posix/realtime_sem_defs.h>
#include <syscall_utils.h>
#include <syscalls.h>
#include <user_mutex_defs.h>
#define SEM_TYPE_NAMED 1
#define SEM_TYPE_UNNAMED 2
static int32
atomic_add_if_greater(int32* value, int32 amount, int32 testValue)
{
int32 current = atomic_get(value);
while (current > testValue) {
int32 old = atomic_test_and_set(value, current + amount, current);
if (old == current)
return old;
current = old;
}
return current;
}
sem_t*
@ -50,6 +70,8 @@ sem_open(const char* name, int openFlags,...)
__set_errno(B_NO_MEMORY);
return SEM_FAILED;
}
sem->type = SEM_TYPE_NAMED;
MemoryDeleter semDeleter(sem);
// ask the kernel to open the semaphore
@ -72,7 +94,8 @@ int
sem_close(sem_t* semaphore)
{
sem_t* deleteSem = NULL;
status_t error = _kern_realtime_sem_close(semaphore->id, &deleteSem);
status_t error = _kern_realtime_sem_close(semaphore->u.named_sem_id,
&deleteSem);
if (error == B_OK)
free(deleteSem);
@ -90,35 +113,87 @@ sem_unlink(const char* name)
int
sem_init(sem_t* semaphore, int shared, unsigned value)
{
RETURN_AND_SET_ERRNO(_kern_realtime_sem_open(NULL, shared, 0, value,
semaphore, NULL));
semaphore->type = SEM_TYPE_UNNAMED;
semaphore->u.unnamed_sem = value;
return 0;
}
int
sem_destroy(sem_t* semaphore)
{
RETURN_AND_SET_ERRNO(_kern_realtime_sem_close(semaphore->id, NULL));
if (semaphore->type != SEM_TYPE_UNNAMED)
RETURN_AND_SET_ERRNO(EINVAL);
return 0;
}
static int
unnamed_sem_post(sem_t* semaphore) {
int32* sem = (int32*)&semaphore->u.unnamed_sem;
int32 oldValue = atomic_add_if_greater(sem, 1, -1);
if (oldValue > -1)
return 0;
return _kern_mutex_sem_release(sem);
}
static int
unnamed_sem_trywait(sem_t* semaphore) {
int32* sem = (int32*)&semaphore->u.unnamed_sem;
int32 oldValue = atomic_add_if_greater(sem, -1, 0);
if (oldValue > 0)
return 0;
return EAGAIN;
}
static int
unnamed_sem_timedwait(sem_t* semaphore, const struct timespec* timeout) {
int32* sem = (int32*)&semaphore->u.unnamed_sem;
bigtime_t timeoutMicros = B_INFINITE_TIMEOUT;
if (timeout != NULL) {
timeoutMicros = ((bigtime_t)timeout->tv_sec) * 1000000
+ timeout->tv_nsec / 1000;
}
int result = unnamed_sem_trywait(semaphore);
if (result == 0)
return 0;
return _kern_mutex_sem_acquire(sem, NULL,
timeoutMicros == B_INFINITE_TIMEOUT ? 0 : B_ABSOLUTE_REAL_TIME_TIMEOUT,
timeoutMicros);
}
int
sem_post(sem_t* semaphore)
{
RETURN_AND_SET_ERRNO(_kern_realtime_sem_post(semaphore->id));
status_t error;
if (semaphore->type == SEM_TYPE_NAMED)
error = _kern_realtime_sem_post(semaphore->u.named_sem_id);
else
error = unnamed_sem_post(semaphore);
RETURN_AND_SET_ERRNO(error);
}
int
sem_timedwait(sem_t* semaphore, const struct timespec* timeout)
static int
named_sem_timedwait(sem_t* semaphore, const struct timespec* timeout)
{
if (timeout != NULL
&& (timeout->tv_nsec < 0 || timeout->tv_nsec >= 1000000000)) {
status_t err = _kern_realtime_sem_wait(semaphore->id, 0);
status_t err = _kern_realtime_sem_wait(semaphore->u.named_sem_id, 0);
if (err == B_WOULD_BLOCK)
err = EINVAL;
// do nothing, return err as it is.
RETURN_AND_SET_ERRNO_TEST_CANCEL(err);
return err;
}
bigtime_t timeoutMicros = B_INFINITE_TIMEOUT;
@ -126,31 +201,62 @@ sem_timedwait(sem_t* semaphore, const struct timespec* timeout)
timeoutMicros = ((bigtime_t)timeout->tv_sec) * 1000000
+ timeout->tv_nsec / 1000;
}
status_t err = _kern_realtime_sem_wait(semaphore->id, timeoutMicros);
status_t err = _kern_realtime_sem_wait(semaphore->u.named_sem_id,
timeoutMicros);
if (err == B_WOULD_BLOCK)
err = ETIMEDOUT;
RETURN_AND_SET_ERRNO_TEST_CANCEL(err);
return err;
}
int
sem_trywait(sem_t* semaphore)
{
RETURN_AND_SET_ERRNO(_kern_realtime_sem_wait(semaphore->id, 0));
status_t error;
if (semaphore->type == SEM_TYPE_NAMED)
error = _kern_realtime_sem_wait(semaphore->u.named_sem_id, 0);
else
error = unnamed_sem_trywait(semaphore);
RETURN_AND_SET_ERRNO(error);
}
int
sem_wait(sem_t* semaphore)
{
RETURN_AND_SET_ERRNO_TEST_CANCEL(
_kern_realtime_sem_wait(semaphore->id, B_INFINITE_TIMEOUT));
status_t error;
if (semaphore->type == SEM_TYPE_NAMED)
error = named_sem_timedwait(semaphore, NULL);
else
error = unnamed_sem_timedwait(semaphore, NULL);
RETURN_AND_SET_ERRNO_TEST_CANCEL(error);
}
int
sem_timedwait(sem_t* semaphore, const struct timespec* timeout)
{
status_t error;
if (semaphore->type == SEM_TYPE_NAMED)
error = named_sem_timedwait(semaphore, timeout);
else
error = unnamed_sem_timedwait(semaphore, timeout);
RETURN_AND_SET_ERRNO_TEST_CANCEL(error);
}
int
sem_getvalue(sem_t* semaphore, int* value)
{
RETURN_AND_SET_ERRNO(_kern_realtime_sem_get_value(semaphore->id, value));
if (semaphore->type == SEM_TYPE_NAMED) {
RETURN_AND_SET_ERRNO(_kern_realtime_sem_get_value(
semaphore->u.named_sem_id, value));
} else {
*value = semaphore->u.unnamed_sem < 0 ? 0 : semaphore->u.unnamed_sem;
return 0;
}
}