kernel: Add event queue implementation to wait for objects efficiently.
Based on hamishm's original patch from 2015, but heavily modified, refactored, and reworked. From the original commit message: > When an object is deleted, a B_EVENT_INVALID event is delivered, > and the object is unregistered from the queue. > > The special event flag B_EVENT_ONE_SHOT can be passed in when adding > an object so that the object is automatically unregistered when an > event is delivered. Modifications to the original change include: * Removed the public interface (syscalls remain private for the moment) * Event list queueing/dequeueing almost entirely rewritten, including: - Clear events field when dequeueing. - Have B_EVENT_QUEUED actually indicate whether the event has been appended to the linked list (or not), based around lock state. The previous logic was prone to races and double-insertions. - "Modify" is now just "Deselect + Select" performed at once; previously it could cause use-after-frees. - Unlock for deselect only once at the end of dequeue. - Handle INVALID events still in the queue upon destruction, fixing memory leaks. * Deduplified code with wait_for_objects. * Use of C++ virtual dispatch instead of C-style enum + function calls, and BReferenceable plus destructors for teardown. * Removed select/modify/delete flags. Select/Modify are now the same operation on the syscall interface, and "Delete" is done when 0 is passed for "events". Additionally, the events selected can be fetched by passing -1 for "events". * Implemented level-triggered mode. * Use of BStackOrHeapArray and other convenience routines in syscalls. Change-Id: I1d2f094fd981c95215a59adbc087523c7bbbe40b Reviewed-on: https://review.haiku-os.org/c/haiku/+/6745 Tested-by: Commit checker robot <no-reply+buildbot@haiku-os.org> Reviewed-by: Jérôme Duval <jerome.duval@gmail.com>
This commit is contained in:
parent
a3f83f646c
commit
f66d2b46a8
|
@ -636,7 +636,7 @@ enum {
|
|||
|
||||
B_EVENT_ACQUIRE_SEMAPHORE = 0x0001, /* semaphore can be acquired */
|
||||
|
||||
B_EVENT_INVALID = 0x1000 /* FD/port/sem/thread ID not or
|
||||
B_EVENT_INVALID = 0x1000, /* FD/port/sem/thread ID not or
|
||||
no longer valid (e.g. has been
|
||||
close/deleted) */
|
||||
};
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright 2015, Hamish Morrison, hamishm53@gmail.com.
|
||||
* All rights reserved. Distributed under the terms of the MIT License.
|
||||
*/
|
||||
|
||||
#ifndef _KERNEL_EVENT_QUEUE_H
|
||||
#define _KERNEL_EVENT_QUEUE_H
|
||||
|
||||
#include <OS.h>
|
||||
#include <event_queue_defs.h>
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
|
||||
extern int _user_event_queue_create(int openFlags);
|
||||
extern status_t _user_event_queue_select(int queue, event_wait_info* userInfos,
|
||||
int numInfos);
|
||||
extern ssize_t _user_event_queue_wait(int queue, event_wait_info* infos,
|
||||
int numInfos, uint32 flags, bigtime_t timeout);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -15,6 +15,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct event_queue;
|
||||
struct file_descriptor;
|
||||
struct io_context;
|
||||
struct net_socket;
|
||||
|
@ -54,6 +55,7 @@ struct file_descriptor {
|
|||
struct vnode *vnode;
|
||||
struct fs_mount *mount;
|
||||
struct net_socket *socket;
|
||||
struct event_queue *queue;
|
||||
} u;
|
||||
void *cookie;
|
||||
int32 open_mode;
|
||||
|
@ -71,7 +73,8 @@ enum fd_types {
|
|||
FDTYPE_INDEX,
|
||||
FDTYPE_INDEX_DIR,
|
||||
FDTYPE_QUERY,
|
||||
FDTYPE_SOCKET
|
||||
FDTYPE_SOCKET,
|
||||
FDTYPE_EVENT_QUEUE
|
||||
};
|
||||
|
||||
// additional open mode - kernel special
|
||||
|
|
|
@ -15,18 +15,12 @@ struct select_sync;
|
|||
|
||||
|
||||
typedef struct select_info {
|
||||
struct select_info* next; // next in the object's list
|
||||
struct select_sync* sync;
|
||||
int32 events;
|
||||
uint16 selected_events;
|
||||
struct select_info* next;
|
||||
struct select_sync* sync;
|
||||
int32 events;
|
||||
uint16 selected_events;
|
||||
} select_info;
|
||||
|
||||
typedef struct select_sync {
|
||||
int32 ref_count;
|
||||
sem_id sem;
|
||||
uint32 count;
|
||||
struct select_info* set;
|
||||
} select_sync;
|
||||
|
||||
#define SELECT_FLAG(type) (1L << (type - 1))
|
||||
|
||||
|
@ -39,9 +33,10 @@ typedef struct select_sync {
|
|||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#endif
|
||||
|
||||
|
||||
extern void acquire_select_sync(select_sync* sync);
|
||||
extern void put_select_sync(select_sync* sync);
|
||||
extern status_t notify_select_events(select_info* info, uint16 events);
|
||||
extern void notify_select_events_list(select_info* list, uint16 events);
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright 2023, Haiku, Inc. All rights reserved.
|
||||
* Distributed under the terms of the MIT License.
|
||||
*/
|
||||
#ifndef _SYSTEM_EVENT_QUEUE_DEFS_H
|
||||
#define _SYSTEM_EVENT_QUEUE_DEFS_H
|
||||
|
||||
|
||||
// extends B_EVENT_* constants defined in OS.h
|
||||
enum {
|
||||
B_EVENT_LEVEL_TRIGGERED = (1 << 26), /* Event is level-triggered, not edge-triggered */
|
||||
B_EVENT_ONE_SHOT = (1 << 27), /* Delete event after delivery */
|
||||
|
||||
/* bits 28 through 30 are reserved for the kernel */
|
||||
};
|
||||
|
||||
|
||||
typedef struct event_wait_info {
|
||||
int32 object;
|
||||
uint16 type;
|
||||
int32 events; /* select(): > 0 to select, -1 to get selection, 0 to deselect */
|
||||
void* user_data;
|
||||
} event_wait_info;
|
||||
|
||||
|
||||
#endif /* _SYSTEM_EVENT_QUEUE_DEFS_H */
|
|
@ -23,6 +23,7 @@ extern "C" {
|
|||
|
||||
struct attr_info;
|
||||
struct dirent;
|
||||
struct event_wait_info;
|
||||
struct fd_info;
|
||||
struct fd_set;
|
||||
struct fs_info;
|
||||
|
@ -74,6 +75,12 @@ extern status_t _kern_get_safemode_option(const char *parameter,
|
|||
extern ssize_t _kern_wait_for_objects(object_wait_info* infos, int numInfos,
|
||||
uint32 flags, bigtime_t timeout);
|
||||
|
||||
extern int _kern_event_queue_create(int openFlags);
|
||||
extern status_t _kern_event_queue_select(int queue,
|
||||
struct event_wait_info* userInfos, int numInfos);
|
||||
extern ssize_t _kern_event_queue_wait(int queue, struct event_wait_info* infos,
|
||||
int numInfos, uint32 flags, bigtime_t timeout);
|
||||
|
||||
/* user mutex functions */
|
||||
extern status_t _kern_mutex_lock(int32* mutex, const char* name,
|
||||
uint32 flags, bigtime_t timeout);
|
||||
|
|
|
@ -60,6 +60,7 @@ KernelMergeObject kernel_core.o :
|
|||
# events
|
||||
wait_for_objects.cpp
|
||||
Notifications.cpp
|
||||
event_queue.cpp
|
||||
|
||||
# locks
|
||||
lock.cpp
|
||||
|
|
|
@ -0,0 +1,725 @@
|
|||
/*
|
||||
* Copyright 2015, Hamish Morrison, hamishm53@gmail.com.
|
||||
* Copyright 2023, Haiku, Inc. All rights reserved.
|
||||
* Distributed under the terms of the MIT License.
|
||||
*/
|
||||
|
||||
#include <event_queue.h>
|
||||
|
||||
#include <OS.h>
|
||||
|
||||
#include <AutoDeleter.h>
|
||||
|
||||
#include <fs/fd.h>
|
||||
#include <port.h>
|
||||
#include <sem.h>
|
||||
#include <syscalls.h>
|
||||
#include <syscall_restart.h>
|
||||
#include <thread.h>
|
||||
#include <util/AutoLock.h>
|
||||
#include <util/AVLTree.h>
|
||||
#include <util/DoublyLinkedList.h>
|
||||
#include <AutoDeleterDrivers.h>
|
||||
#include <StackOrHeapArray.h>
|
||||
#include <wait_for_objects.h>
|
||||
|
||||
#include "select_ops.h"
|
||||
#include "select_sync.h"
|
||||
|
||||
|
||||
enum {
|
||||
B_EVENT_QUEUED = (1 << 28),
|
||||
B_EVENT_SELECTING = (1 << 29),
|
||||
B_EVENT_DELETING = (1 << 30),
|
||||
/* (signed) */
|
||||
B_EVENT_PRIVATE_MASK = (0xf0000000)
|
||||
};
|
||||
|
||||
|
||||
#define EVENT_BEHAVIOR(events) ((events) & (B_EVENT_LEVEL_TRIGGERED | B_EVENT_ONE_SHOT))
|
||||
#define USER_EVENTS(events) ((events) & ~B_EVENT_PRIVATE_MASK)
|
||||
|
||||
#define B_EVENT_NON_MASKABLE (B_EVENT_INVALID | B_EVENT_ERROR | B_EVENT_DISCONNECTED)
|
||||
|
||||
|
||||
|
||||
struct select_event : select_info, AVLTreeNode,
|
||||
DoublyLinkedListLinkImpl<select_event> {
|
||||
int32 object;
|
||||
uint16 type;
|
||||
uint32 behavior;
|
||||
void* user_data;
|
||||
};
|
||||
|
||||
|
||||
struct EventQueueTreeDefinition {
|
||||
typedef struct {
|
||||
int32 object;
|
||||
uint16 type;
|
||||
} Key;
|
||||
typedef select_event Value;
|
||||
|
||||
AVLTreeNode* GetAVLTreeNode(Value* value) const
|
||||
{
|
||||
return value;
|
||||
}
|
||||
|
||||
Value* GetValue(AVLTreeNode* node) const
|
||||
{
|
||||
return static_cast<Value*>(node);
|
||||
}
|
||||
|
||||
int Compare(Key a, const Value* b) const
|
||||
{
|
||||
if (a.object != b->object)
|
||||
return a.object - b->object;
|
||||
else
|
||||
return a.type - b->type;
|
||||
}
|
||||
|
||||
int Compare(const Value* a, const Value* b) const
|
||||
{
|
||||
if (a->object != b->object)
|
||||
return a->object - b->object;
|
||||
else
|
||||
return a->type - b->type;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
// #pragma mark -- EventQueue implementation
|
||||
|
||||
|
||||
class EventQueue : public select_sync {
|
||||
public:
|
||||
EventQueue(bool kernel);
|
||||
~EventQueue();
|
||||
|
||||
void Closed();
|
||||
|
||||
status_t Select(int32 object, uint16 type, uint32 events, void* userData);
|
||||
status_t Query(int32 object, uint16 type, uint32* selectedEvents, void** userData);
|
||||
status_t Deselect(int32 object, uint16 type);
|
||||
|
||||
status_t Notify(select_info* info, uint16 events);
|
||||
|
||||
ssize_t Wait(event_wait_info* infos, int numInfos,
|
||||
int32 flags, bigtime_t timeout);
|
||||
|
||||
private:
|
||||
void _Notify(select_event* event, uint16 events);
|
||||
status_t _DeselectEvent(select_event* event);
|
||||
|
||||
ssize_t _DequeueEvents(event_wait_info* infos, int numInfos);
|
||||
|
||||
select_event* _GetEvent(int32 object, uint16 type);
|
||||
|
||||
private:
|
||||
typedef AVLTree<EventQueueTreeDefinition> EventTree;
|
||||
typedef DoublyLinkedList<select_event> EventList;
|
||||
|
||||
bool fKernel;
|
||||
bool fClosing;
|
||||
|
||||
/*
|
||||
* This flag is set in _DequeueEvents when we have to drop the lock to
|
||||
* deselect an object to prevent another _DequeueEvents call concurrently
|
||||
* modifying the list.
|
||||
*/
|
||||
bool fDequeueing;
|
||||
|
||||
EventList fEventList;
|
||||
EventTree fEventTree;
|
||||
|
||||
/*
|
||||
* Protects the queue. We cannot call select or deselect while holding
|
||||
* this, because it will invert the locking order with EventQueue::Notify.
|
||||
*/
|
||||
mutex fQueueLock;
|
||||
|
||||
/*
|
||||
* Notified when events are available on the queue.
|
||||
*/
|
||||
ConditionVariable fQueueCondition;
|
||||
|
||||
/*
|
||||
* Used to wait on a changing select_event while the queue lock is dropped
|
||||
* during a call to select/deselect.
|
||||
*/
|
||||
ConditionVariable fEventCondition;
|
||||
};
|
||||
|
||||
|
||||
EventQueue::EventQueue(bool kernel)
|
||||
:
|
||||
fKernel(kernel),
|
||||
fClosing(false),
|
||||
fDequeueing(false)
|
||||
{
|
||||
mutex_init(&fQueueLock, "event_queue lock");
|
||||
fQueueCondition.Init(this, "evtq wait");
|
||||
fEventCondition.Init(this, "event_queue event change wait");
|
||||
}
|
||||
|
||||
|
||||
EventQueue::~EventQueue()
|
||||
{
|
||||
mutex_lock(&fQueueLock);
|
||||
ASSERT(fClosing && !fDequeueing);
|
||||
|
||||
EventTree::Iterator iter = fEventTree.GetIterator();
|
||||
while (iter.HasNext()) {
|
||||
select_event* event = iter.Next();
|
||||
event->events |= B_EVENT_DELETING;
|
||||
|
||||
mutex_unlock(&fQueueLock);
|
||||
_DeselectEvent(event);
|
||||
mutex_lock(&fQueueLock);
|
||||
|
||||
iter.Remove();
|
||||
if ((event->events & B_EVENT_QUEUED) != 0)
|
||||
fEventList.Remove(event);
|
||||
delete event;
|
||||
}
|
||||
|
||||
EventList::Iterator listIter = fEventList.GetIterator();
|
||||
while (listIter.HasNext()) {
|
||||
select_event* event = listIter.Next();
|
||||
|
||||
// We already removed all events in the tree from this list.
|
||||
// The only remaining events will be INVALID ones already deselected.
|
||||
delete event;
|
||||
}
|
||||
|
||||
mutex_destroy(&fQueueLock);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
EventQueue::Closed()
|
||||
{
|
||||
MutexLocker locker(&fQueueLock);
|
||||
|
||||
fClosing = true;
|
||||
locker.Unlock();
|
||||
|
||||
// Wake up all waiters
|
||||
fQueueCondition.NotifyAll(B_FILE_ERROR);
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
EventQueue::Select(int32 object, uint16 type, uint32 events, void* userData)
|
||||
{
|
||||
MutexLocker locker(&fQueueLock);
|
||||
|
||||
select_event* event = _GetEvent(object, type);
|
||||
if (event != NULL) {
|
||||
if ((event->selected_events | event->behavior)
|
||||
== (USER_EVENTS(events) | B_EVENT_NON_MASKABLE))
|
||||
return B_OK;
|
||||
|
||||
// Rather than try to reuse the event object, which would be complicated
|
||||
// and error-prone, perform a full de-selection and then re-selection.
|
||||
locker.Unlock();
|
||||
status_t status = Deselect(object, type);
|
||||
if (status != B_OK)
|
||||
return status;
|
||||
locker.Lock();
|
||||
|
||||
// Make sure nothing else re-selected before we reacquired the lock.
|
||||
event = _GetEvent(object, type);
|
||||
if (event != NULL)
|
||||
return EEXIST;
|
||||
}
|
||||
|
||||
event = new(std::nothrow) select_event;
|
||||
if (event == NULL)
|
||||
return B_NO_MEMORY;
|
||||
ObjectDeleter<select_event> eventDeleter(event);
|
||||
|
||||
event->sync = this;
|
||||
event->object = object;
|
||||
event->type = type;
|
||||
event->behavior = EVENT_BEHAVIOR(events);
|
||||
event->user_data = userData;
|
||||
event->events = 0;
|
||||
|
||||
status_t result = fEventTree.Insert(event);
|
||||
if (result != B_OK)
|
||||
return result;
|
||||
|
||||
// We drop the lock before calling select() to avoid inverting the
|
||||
// locking order with Notify(). Setting the B_EVENT_SELECTING flag prevents
|
||||
// the event from being used or even deleted before it is ready.
|
||||
event->events |= B_EVENT_SELECTING;
|
||||
event->selected_events = USER_EVENTS(events) | B_EVENT_NON_MASKABLE;
|
||||
|
||||
locker.Unlock();
|
||||
|
||||
status_t status = select_object(event->type, event->object, event, fKernel);
|
||||
if (status < 0) {
|
||||
locker.Lock();
|
||||
fEventTree.Remove(event);
|
||||
fEventCondition.NotifyAll();
|
||||
return status;
|
||||
}
|
||||
|
||||
eventDeleter.Detach();
|
||||
|
||||
atomic_and(&event->events, ~B_EVENT_SELECTING);
|
||||
fEventCondition.NotifyAll();
|
||||
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
EventQueue::Query(int32 object, uint16 type, uint32* selectedEvents, void** userData)
|
||||
{
|
||||
MutexLocker locker(&fQueueLock);
|
||||
|
||||
select_event* event = _GetEvent(object, type);
|
||||
if (event == NULL)
|
||||
return B_ENTRY_NOT_FOUND;
|
||||
|
||||
*selectedEvents = event->selected_events | event->behavior;
|
||||
*userData = event->user_data;
|
||||
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
EventQueue::Deselect(int32 object, uint16 type)
|
||||
{
|
||||
MutexLocker locker(&fQueueLock);
|
||||
|
||||
select_event* event = _GetEvent(object, type);
|
||||
if (event == NULL)
|
||||
return B_ENTRY_NOT_FOUND;
|
||||
|
||||
if ((atomic_or(&event->events, B_EVENT_DELETING) & B_EVENT_DELETING) != 0)
|
||||
return B_OK;
|
||||
|
||||
locker.Unlock();
|
||||
_DeselectEvent(event);
|
||||
locker.Lock();
|
||||
|
||||
if ((event->events & B_EVENT_INVALID) == 0)
|
||||
fEventTree.Remove(event);
|
||||
if ((event->events & B_EVENT_QUEUED) != 0)
|
||||
fEventList.Remove(event);
|
||||
|
||||
delete event;
|
||||
|
||||
locker.Unlock();
|
||||
fEventCondition.NotifyAll();
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
EventQueue::_DeselectEvent(select_event* event)
|
||||
{
|
||||
return deselect_object(event->type, event->object, event, fKernel);
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
EventQueue::Notify(select_info* info, uint16 events)
|
||||
{
|
||||
select_event* event = static_cast<select_event*>(info);
|
||||
_Notify(event, events);
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
EventQueue::_Notify(select_event* event, uint16 events)
|
||||
{
|
||||
if ((events & event->selected_events) == 0)
|
||||
return;
|
||||
|
||||
const int32 previousEvents = atomic_or(&event->events, events);
|
||||
|
||||
// If the event is already being deleted, we should ignore this notification.
|
||||
if ((previousEvents & B_EVENT_DELETING) != 0)
|
||||
return;
|
||||
|
||||
// If the event is already queued, and it is not becoming invalid,
|
||||
// we don't need to do anything more.
|
||||
if ((previousEvents & B_EVENT_QUEUED) != 0 && (events & B_EVENT_INVALID) == 0)
|
||||
return;
|
||||
|
||||
{
|
||||
MutexLocker _(&fQueueLock);
|
||||
|
||||
// We need to recheck B_EVENT_DELETING now we have the lock.
|
||||
if ((event->events & B_EVENT_DELETING) != 0)
|
||||
return;
|
||||
|
||||
// If we get B_EVENT_INVALID it means the object we were monitoring was
|
||||
// deleted. The object's ID may now be reused, so we must remove it
|
||||
// from the event tree.
|
||||
if ((events & B_EVENT_INVALID) != 0)
|
||||
fEventTree.Remove(event);
|
||||
|
||||
// If it's not already queued, it's our responsibility to queue it.
|
||||
if ((atomic_or(&event->events, B_EVENT_QUEUED) & B_EVENT_QUEUED) == 0) {
|
||||
fEventList.Add(event);
|
||||
fQueueCondition.NotifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ssize_t
|
||||
EventQueue::Wait(event_wait_info* infos, int numInfos,
|
||||
int32 flags, bigtime_t timeout)
|
||||
{
|
||||
ASSERT((flags & B_ABSOLUTE_TIMEOUT) != 0
|
||||
|| (timeout == B_INFINITE_TIMEOUT || timeout == 0));
|
||||
|
||||
MutexLocker queueLocker(&fQueueLock);
|
||||
|
||||
ssize_t count = 0;
|
||||
while (timeout == 0 || (system_time() < timeout)) {
|
||||
while ((fDequeueing || fEventList.IsEmpty()) && !fClosing) {
|
||||
status_t status = fQueueCondition.Wait(queueLocker.Get(),
|
||||
flags | B_CAN_INTERRUPT, timeout);
|
||||
if (status != B_OK)
|
||||
return status;
|
||||
}
|
||||
|
||||
if (fClosing)
|
||||
return B_FILE_ERROR;
|
||||
|
||||
if (numInfos == 0)
|
||||
return B_OK;
|
||||
|
||||
fDequeueing = true;
|
||||
count = _DequeueEvents(infos, numInfos);
|
||||
fDequeueing = false;
|
||||
|
||||
if (count != 0)
|
||||
break;
|
||||
|
||||
// Due to level-triggered events, it is possible for the event list to have
|
||||
// been not empty and _DequeueEvents() still returns nothing. Hence, we loop.
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
ssize_t
|
||||
EventQueue::_DequeueEvents(event_wait_info* infos, int numInfos)
|
||||
{
|
||||
ssize_t count = 0;
|
||||
|
||||
const int32 kMaxToDeselect = 8;
|
||||
select_event* deselect[kMaxToDeselect];
|
||||
int32 deselectCount = 0;
|
||||
|
||||
// Add a marker element, so we don't loop forever after unlocking the list.
|
||||
// (There is only one invocation of _DequeueEvents() at a time.)
|
||||
select_event marker = {};
|
||||
fEventList.Add(&marker);
|
||||
|
||||
for (select_event* event = NULL; count < numInfos; ) {
|
||||
if (fEventList.Head() == NULL || fEventList.Head() == &marker)
|
||||
break;
|
||||
|
||||
event = fEventList.RemoveHead();
|
||||
int32 events = atomic_and(&event->events,
|
||||
~(event->selected_events | B_EVENT_QUEUED));
|
||||
|
||||
if ((events & B_EVENT_DELETING) != 0)
|
||||
continue;
|
||||
|
||||
if ((events & B_EVENT_INVALID) == 0
|
||||
&& (event->behavior & B_EVENT_LEVEL_TRIGGERED) != 0) {
|
||||
// This event is level-triggered. We need to deselect and reselect it,
|
||||
// as its state may have changed since we were notified.
|
||||
const select_event tmp = *event;
|
||||
|
||||
mutex_unlock(&fQueueLock);
|
||||
status_t status = Deselect(tmp.object, tmp.type);
|
||||
if (status == B_OK) {
|
||||
event = NULL;
|
||||
status = Select(tmp.object, tmp.type,
|
||||
tmp.selected_events | tmp.behavior, tmp.user_data);
|
||||
}
|
||||
mutex_lock(&fQueueLock);
|
||||
|
||||
if (status == B_OK) {
|
||||
// Is the event still queued?
|
||||
event = _GetEvent(tmp.object, tmp.type);
|
||||
if (event == NULL)
|
||||
continue;
|
||||
events = atomic_get(&event->events);
|
||||
if ((events & B_EVENT_QUEUED) == 0)
|
||||
continue;
|
||||
} else if (event == NULL) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
infos[count].object = event->object;
|
||||
infos[count].type = event->type;
|
||||
infos[count].user_data = event->user_data;
|
||||
infos[count].events = USER_EVENTS(events);
|
||||
count++;
|
||||
|
||||
// All logic past this point has to do with deleting events.
|
||||
if ((events & B_EVENT_INVALID) == 0 && (event->behavior & B_EVENT_ONE_SHOT) == 0)
|
||||
continue;
|
||||
|
||||
// Check if the event was requeued.
|
||||
if ((atomic_and(&event->events, ~B_EVENT_QUEUED) & B_EVENT_QUEUED) != 0)
|
||||
fEventList.Remove(event);
|
||||
|
||||
if ((events & B_EVENT_INVALID) != 0) {
|
||||
// The event will already have been removed from the tree.
|
||||
delete event;
|
||||
} else if ((event->behavior & B_EVENT_ONE_SHOT) != 0) {
|
||||
// We already checked B_EVENT_INVALID above, so we don't need to again.
|
||||
fEventTree.Remove(event);
|
||||
event->events = B_EVENT_DELETING;
|
||||
|
||||
deselect[deselectCount++] = event;
|
||||
if (deselectCount == kMaxToDeselect)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
fEventList.Remove(&marker);
|
||||
|
||||
if (deselectCount != 0) {
|
||||
mutex_unlock(&fQueueLock);
|
||||
for (int32 i = 0; i < deselectCount; i++) {
|
||||
select_event* event = deselect[i];
|
||||
|
||||
_DeselectEvent(event);
|
||||
delete event;
|
||||
}
|
||||
mutex_lock(&fQueueLock);
|
||||
|
||||
// We don't need to notify waiters, as we removed the events
|
||||
// from anywhere they could be found before dropping the lock.
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Get the select_event for the given object and type. Must be called with the
|
||||
* queue lock held. This method will sleep if the event is undergoing selection
|
||||
* or deletion.
|
||||
*/
|
||||
select_event*
|
||||
EventQueue::_GetEvent(int32 object, uint16 type)
|
||||
{
|
||||
EventQueueTreeDefinition::Key key = { object, type };
|
||||
|
||||
while (true) {
|
||||
select_event* event = fEventTree.Find(key);
|
||||
if (event == NULL)
|
||||
return NULL;
|
||||
|
||||
if ((event->events & (B_EVENT_SELECTING | B_EVENT_DELETING)) == 0)
|
||||
return event;
|
||||
|
||||
fEventCondition.Wait(&fQueueLock);
|
||||
|
||||
// At this point the select_event might have been deleted, so we
|
||||
// need to refetch it.
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// #pragma mark -- File descriptor ops
|
||||
|
||||
|
||||
|
||||
static status_t
|
||||
event_queue_close(file_descriptor* descriptor)
|
||||
{
|
||||
EventQueue* queue = (EventQueue*)descriptor->u.queue;
|
||||
queue->Closed();
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
event_queue_free(file_descriptor* descriptor)
|
||||
{
|
||||
EventQueue* queue = (EventQueue*)descriptor->u.queue;
|
||||
put_select_sync(queue);
|
||||
}
|
||||
|
||||
|
||||
static status_t
|
||||
get_queue_descriptor(int fd, bool kernel, file_descriptor*& descriptor)
|
||||
{
|
||||
if (fd < 0)
|
||||
return B_FILE_ERROR;
|
||||
|
||||
descriptor = get_fd(get_current_io_context(kernel), fd);
|
||||
if (descriptor == NULL)
|
||||
return B_FILE_ERROR;
|
||||
|
||||
if (descriptor->type != FDTYPE_EVENT_QUEUE) {
|
||||
put_fd(descriptor);
|
||||
return B_BAD_VALUE;
|
||||
}
|
||||
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
|
||||
#define GET_QUEUE_FD_OR_RETURN(fd, kernel, descriptor) \
|
||||
do { \
|
||||
status_t getError = get_queue_descriptor(fd, kernel, descriptor); \
|
||||
if (getError != B_OK) \
|
||||
return getError; \
|
||||
} while (false)
|
||||
|
||||
|
||||
static struct fd_ops sEventQueueFDOps = {
|
||||
NULL, // fd_read
|
||||
NULL, // fd_write
|
||||
NULL, // fd_seek
|
||||
NULL, // fd_ioctl
|
||||
NULL, // fd_set_flags
|
||||
NULL, // fd_select
|
||||
NULL, // fd_deselect
|
||||
NULL, // fd_read_dir
|
||||
NULL, // fd_rewind_dir
|
||||
NULL, // fd_read_stat
|
||||
NULL, // fd_write_stat
|
||||
&event_queue_close,
|
||||
&event_queue_free
|
||||
};
|
||||
|
||||
|
||||
// #pragma mark - User syscalls
|
||||
|
||||
|
||||
int
|
||||
_user_event_queue_create(int openFlags)
|
||||
{
|
||||
EventQueue* queue = new(std::nothrow) EventQueue(false);
|
||||
if (queue == NULL)
|
||||
return B_NO_MEMORY;
|
||||
|
||||
ObjectDeleter<EventQueue> deleter(queue);
|
||||
|
||||
file_descriptor* descriptor = alloc_fd();
|
||||
if (descriptor == NULL)
|
||||
return B_NO_MEMORY;
|
||||
|
||||
descriptor->type = FDTYPE_EVENT_QUEUE;
|
||||
descriptor->ops = &sEventQueueFDOps;
|
||||
descriptor->u.queue = (struct event_queue*)queue;
|
||||
descriptor->open_mode = O_RDWR | openFlags;
|
||||
|
||||
io_context* context = get_current_io_context(false);
|
||||
int fd = new_fd(context, descriptor);
|
||||
if (fd < 0) {
|
||||
free(descriptor);
|
||||
return fd;
|
||||
}
|
||||
|
||||
mutex_lock(&context->io_mutex);
|
||||
fd_set_close_on_exec(context, fd, (openFlags & O_CLOEXEC) != 0);
|
||||
mutex_unlock(&context->io_mutex);
|
||||
|
||||
deleter.Detach();
|
||||
return fd;
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
_user_event_queue_select(int queue, event_wait_info* userInfos, int numInfos)
|
||||
{
|
||||
if (numInfos <= 0)
|
||||
return B_BAD_VALUE;
|
||||
if (userInfos == NULL || !IS_USER_ADDRESS(userInfos))
|
||||
return B_BAD_ADDRESS;
|
||||
|
||||
BStackOrHeapArray<event_wait_info, 16> infos(numInfos);
|
||||
if (!infos.IsValid())
|
||||
return B_NO_MEMORY;
|
||||
|
||||
file_descriptor* descriptor;
|
||||
GET_QUEUE_FD_OR_RETURN(queue, false, descriptor);
|
||||
DescriptorPutter _(descriptor);
|
||||
|
||||
EventQueue* eventQueue = (EventQueue*)descriptor->u.queue;
|
||||
|
||||
if (user_memcpy(infos, userInfos, sizeof(event_wait_info) * numInfos) != B_OK)
|
||||
return B_BAD_ADDRESS;
|
||||
|
||||
status_t result = B_OK;
|
||||
|
||||
for (int i = 0; i < numInfos; i++) {
|
||||
status_t error;
|
||||
if (infos[i].events > 0) {
|
||||
error = eventQueue->Select(infos[i].object, infos[i].type,
|
||||
infos[i].events, infos[i].user_data);
|
||||
} else if (infos[i].events < 0) {
|
||||
uint32 selectedEvents = 0;
|
||||
error = eventQueue->Query(infos[i].object, infos[i].type,
|
||||
&selectedEvents, &infos[i].user_data);
|
||||
if (error == B_OK) {
|
||||
infos[i].events = selectedEvents;
|
||||
error = user_memcpy(&userInfos[i], &infos[i], sizeof(event_wait_info));
|
||||
}
|
||||
} else /* == 0 */ {
|
||||
error = eventQueue->Deselect(infos[i].object, infos[i].type);
|
||||
}
|
||||
|
||||
if (error != B_OK) {
|
||||
user_memcpy(&userInfos[i].events, &error, sizeof(&userInfos[i].events));
|
||||
result = B_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
ssize_t
|
||||
_user_event_queue_wait(int queue, event_wait_info* userInfos, int numInfos,
|
||||
uint32 flags, bigtime_t timeout)
|
||||
{
|
||||
syscall_restart_handle_timeout_pre(flags, timeout);
|
||||
|
||||
if (numInfos < 0)
|
||||
return B_BAD_VALUE;
|
||||
if (numInfos > 0 && (userInfos == NULL || !IS_USER_ADDRESS(userInfos)))
|
||||
return B_BAD_ADDRESS;
|
||||
|
||||
BStackOrHeapArray<event_wait_info, 16> infos(numInfos);
|
||||
if (!infos.IsValid())
|
||||
return B_NO_MEMORY;
|
||||
|
||||
file_descriptor* descriptor;
|
||||
GET_QUEUE_FD_OR_RETURN(queue, false, descriptor);
|
||||
DescriptorPutter _(descriptor);
|
||||
|
||||
EventQueue* eventQueue = (EventQueue*)descriptor->u.queue;
|
||||
|
||||
ssize_t result = eventQueue->Wait(infos, numInfos, flags, timeout);
|
||||
if (result < 0)
|
||||
return syscall_restart_handle_timeout_post(result, timeout);
|
||||
|
||||
status_t status = B_OK;
|
||||
if (numInfos != 0)
|
||||
status = user_memcpy(userInfos, infos, sizeof(event_wait_info) * numInfos);
|
||||
|
||||
return status == B_OK ? result : status;
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright 2015, Hamish Morrison, hamishm53@gmail.com.
|
||||
* All rights reserved. Distributed under the terms of the MIT License.
|
||||
*/
|
||||
#ifndef _KERNEL_SELECT_OPS_H
|
||||
#define _KERNEL_SELECT_OPS_H
|
||||
|
||||
|
||||
struct select_ops {
|
||||
status_t (*select)(int32 object, struct select_info* info, bool kernel);
|
||||
status_t (*deselect)(int32 object, struct select_info* info, bool kernel);
|
||||
};
|
||||
|
||||
static const select_ops kSelectOps[] = {
|
||||
// B_OBJECT_TYPE_FD
|
||||
{
|
||||
select_fd,
|
||||
deselect_fd
|
||||
},
|
||||
|
||||
// B_OBJECT_TYPE_SEMAPHORE
|
||||
{
|
||||
select_sem,
|
||||
deselect_sem
|
||||
},
|
||||
|
||||
// B_OBJECT_TYPE_PORT
|
||||
{
|
||||
select_port,
|
||||
deselect_port
|
||||
},
|
||||
|
||||
// B_OBJECT_TYPE_THREAD
|
||||
{
|
||||
select_thread,
|
||||
deselect_thread
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
static inline status_t
|
||||
select_object(uint32 type, int32 object, struct select_info* sync, bool kernel)
|
||||
{
|
||||
if (type >= B_COUNT_OF(kSelectOps))
|
||||
return B_BAD_VALUE;
|
||||
return kSelectOps[type].select(object, sync, kernel);
|
||||
}
|
||||
|
||||
|
||||
static inline status_t
|
||||
deselect_object(uint32 type, int32 object, struct select_info* sync, bool kernel)
|
||||
{
|
||||
if (type >= B_COUNT_OF(kSelectOps))
|
||||
return B_BAD_VALUE;
|
||||
return kSelectOps[type].deselect(object, sync, kernel);
|
||||
}
|
||||
|
||||
|
||||
#endif
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* Copyright 2023, Haiku, Inc. All rights reserved.
|
||||
* Distributed under the terms of the MIT License.
|
||||
*/
|
||||
#ifndef _KERNEL_SELECT_SYNC_H
|
||||
#define _KERNEL_SELECT_SYNC_H
|
||||
|
||||
#include <wait_for_objects.h>
|
||||
#include <Referenceable.h>
|
||||
|
||||
|
||||
struct select_sync : public BReferenceable {
|
||||
virtual ~select_sync();
|
||||
|
||||
virtual status_t Notify(select_info* info, uint16 events) = 0;
|
||||
};
|
||||
|
||||
|
||||
#endif // _KERNEL_SELECT_SYNC_H
|
|
@ -21,6 +21,7 @@
|
|||
#include <AutoDeleter.h>
|
||||
#include <StackOrHeapArray.h>
|
||||
|
||||
#include <event_queue.h>
|
||||
#include <fs/fd.h>
|
||||
#include <port.h>
|
||||
#include <sem.h>
|
||||
|
@ -32,6 +33,9 @@
|
|||
#include <util/DoublyLinkedList.h>
|
||||
#include <vfs.h>
|
||||
|
||||
#include "select_ops.h"
|
||||
#include "select_sync.h"
|
||||
|
||||
|
||||
//#define TRACE_WAIT_FOR_OBJECTS
|
||||
#ifdef TRACE_WAIT_FOR_OBJECTS
|
||||
|
@ -59,40 +63,19 @@ struct select_sync_pool {
|
|||
};
|
||||
|
||||
|
||||
struct select_ops {
|
||||
status_t (*select)(int32 object, struct select_info* info, bool kernel);
|
||||
status_t (*deselect)(int32 object, struct select_info* info, bool kernel);
|
||||
struct wait_for_objects_sync : public select_sync {
|
||||
sem_id sem;
|
||||
uint32 count;
|
||||
struct select_info* set;
|
||||
|
||||
virtual ~wait_for_objects_sync();
|
||||
virtual status_t Notify(select_info* info, uint16 events);
|
||||
};
|
||||
|
||||
|
||||
static const select_ops kSelectOps[] = {
|
||||
// B_OBJECT_TYPE_FD
|
||||
{
|
||||
select_fd,
|
||||
deselect_fd
|
||||
},
|
||||
|
||||
// B_OBJECT_TYPE_SEMAPHORE
|
||||
{
|
||||
select_sem,
|
||||
deselect_sem
|
||||
},
|
||||
|
||||
// B_OBJECT_TYPE_PORT
|
||||
{
|
||||
select_port,
|
||||
deselect_port
|
||||
},
|
||||
|
||||
// B_OBJECT_TYPE_THREAD
|
||||
{
|
||||
select_thread,
|
||||
deselect_thread
|
||||
}
|
||||
};
|
||||
|
||||
static const uint32 kSelectOpsCount = sizeof(kSelectOps) / sizeof(select_ops);
|
||||
|
||||
select_sync::~select_sync()
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
#if WAIT_FOR_OBJECTS_TRACING
|
||||
|
@ -378,13 +361,13 @@ fd_zero(fd_set *set, int numFDs)
|
|||
|
||||
|
||||
static status_t
|
||||
create_select_sync(int numFDs, select_sync*& _sync)
|
||||
create_select_sync(int numFDs, wait_for_objects_sync*& _sync)
|
||||
{
|
||||
// create sync structure
|
||||
select_sync* sync = new(nothrow) select_sync;
|
||||
wait_for_objects_sync* sync = new(nothrow) wait_for_objects_sync;
|
||||
if (sync == NULL)
|
||||
return B_NO_MEMORY;
|
||||
ObjectDeleter<select_sync> syncDeleter(sync);
|
||||
ObjectDeleter<wait_for_objects_sync> syncDeleter(sync);
|
||||
|
||||
// create info set
|
||||
sync->set = new(nothrow) select_info[numFDs];
|
||||
|
@ -398,7 +381,6 @@ create_select_sync(int numFDs, select_sync*& _sync)
|
|||
return sync->sem;
|
||||
|
||||
sync->count = numFDs;
|
||||
sync->ref_count = 1;
|
||||
|
||||
for (int i = 0; i < numFDs; i++) {
|
||||
sync->set[i].next = NULL;
|
||||
|
@ -413,16 +395,43 @@ create_select_sync(int numFDs, select_sync*& _sync)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
acquire_select_sync(select_sync* sync)
|
||||
{
|
||||
FUNCTION(("acquire_select_sync(%p)\n", sync));
|
||||
sync->AcquireReference();
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
put_select_sync(select_sync* sync)
|
||||
{
|
||||
FUNCTION(("put_select_sync(%p): -> %ld\n", sync, sync->ref_count - 1));
|
||||
FUNCTION(("put_select_sync(%p): -> %ld\n", sync, sync->CountReferences() - 1));
|
||||
sync->ReleaseReference();
|
||||
}
|
||||
|
||||
if (atomic_add(&sync->ref_count, -1) == 1) {
|
||||
delete_sem(sync->sem);
|
||||
delete[] sync->set;
|
||||
delete sync;
|
||||
}
|
||||
|
||||
wait_for_objects_sync::~wait_for_objects_sync()
|
||||
{
|
||||
delete_sem(sem);
|
||||
delete[] set;
|
||||
}
|
||||
|
||||
|
||||
status_t
|
||||
wait_for_objects_sync::Notify(select_info* info, uint16 events)
|
||||
{
|
||||
if (sem < B_OK)
|
||||
return B_BAD_VALUE;
|
||||
|
||||
atomic_or(&info->events, events);
|
||||
|
||||
// only wake up the waiting select()/poll() call if the events
|
||||
// match one of the selected ones
|
||||
if (info->selected_events & events)
|
||||
return release_sem_etc(sem, 1, B_DO_NOT_RESCHEDULE);
|
||||
|
||||
return B_OK;
|
||||
}
|
||||
|
||||
|
||||
|
@ -448,7 +457,7 @@ common_select(int numFDs, fd_set *readSet, fd_set *writeSet, fd_set *errorSet,
|
|||
}
|
||||
|
||||
// allocate sync object
|
||||
select_sync* sync;
|
||||
wait_for_objects_sync* sync;
|
||||
status = create_select_sync(numFDs, sync);
|
||||
if (status != B_OK)
|
||||
return status;
|
||||
|
@ -561,7 +570,7 @@ common_poll(struct pollfd *fds, nfds_t numFDs, bigtime_t timeout,
|
|||
const sigset_t *sigMask, bool kernel)
|
||||
{
|
||||
// allocate sync object
|
||||
select_sync* sync;
|
||||
wait_for_objects_sync* sync;
|
||||
status_t status = create_select_sync(numFDs, sync);
|
||||
if (status != B_OK)
|
||||
return status;
|
||||
|
@ -654,7 +663,7 @@ common_wait_for_objects(object_wait_info* infos, int numInfos, uint32 flags,
|
|||
status_t status = B_OK;
|
||||
|
||||
// allocate sync object
|
||||
select_sync* sync;
|
||||
wait_for_objects_sync* sync;
|
||||
status = create_select_sync(numInfos, sync);
|
||||
if (status != B_OK)
|
||||
return status;
|
||||
|
@ -672,8 +681,7 @@ common_wait_for_objects(object_wait_info* infos, int numInfos, uint32 flags,
|
|||
sync->set[i].events = 0;
|
||||
infos[i].events = 0;
|
||||
|
||||
if (type >= kSelectOpsCount
|
||||
|| kSelectOps[type].select(object, sync->set + i, kernel) != B_OK) {
|
||||
if (select_object(type, object, sync->set + i, kernel) != B_OK) {
|
||||
sync->set[i].events = B_EVENT_INVALID;
|
||||
infos[i].events = B_EVENT_INVALID;
|
||||
// indicates that the object doesn't need to be deselected
|
||||
|
@ -691,8 +699,8 @@ common_wait_for_objects(object_wait_info* infos, int numInfos, uint32 flags,
|
|||
for (int i = 0; i < numInfos; i++) {
|
||||
uint16 type = infos[i].type;
|
||||
|
||||
if (type < kSelectOpsCount && (infos[i].events & B_EVENT_INVALID) == 0)
|
||||
kSelectOps[type].deselect(infos[i].object, sync->set + i, kernel);
|
||||
if ((infos[i].events & B_EVENT_INVALID) == 0)
|
||||
deselect_object(type, infos[i].object, sync->set + i, kernel);
|
||||
}
|
||||
|
||||
// collect the events that have happened in the meantime
|
||||
|
@ -725,19 +733,10 @@ notify_select_events(select_info* info, uint16 events)
|
|||
FUNCTION(("notify_select_events(%p (%p), 0x%x)\n", info, info->sync,
|
||||
events));
|
||||
|
||||
if (info == NULL
|
||||
|| info->sync == NULL
|
||||
|| info->sync->sem < B_OK)
|
||||
if (info == NULL || info->sync == NULL)
|
||||
return B_BAD_VALUE;
|
||||
|
||||
atomic_or(&info->events, events);
|
||||
|
||||
// only wake up the waiting select()/poll() call if the events
|
||||
// match one of the selected ones
|
||||
if (info->selected_events & events)
|
||||
return release_sem_etc(info->sync->sem, 1, B_DO_NOT_RESCHEDULE);
|
||||
|
||||
return B_OK;
|
||||
return info->sync->Notify(info, events);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -633,7 +633,7 @@ select_fd(int32 fd, struct select_info* info, bool kernel)
|
|||
|
||||
// As long as the info is in the list, we keep a reference to the sync
|
||||
// object.
|
||||
atomic_add(&info->sync->ref_count, 1);
|
||||
acquire_select_sync(info->sync);
|
||||
|
||||
// Finally release our open reference. It is safe just to decrement,
|
||||
// since as long as the descriptor is associated with the slot,
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include <debug.h>
|
||||
#include <disk_device_manager/ddm_userland_interface.h>
|
||||
#include <elf.h>
|
||||
#include <event_queue.h>
|
||||
#include <frame_buffer_console.h>
|
||||
#include <fs/fd.h>
|
||||
#include <fs/node_monitor.h>
|
||||
|
|
|
@ -2619,7 +2619,7 @@ select_thread(int32 id, struct select_info* info, bool kernel)
|
|||
thread->select_infos = info;
|
||||
|
||||
// we need a sync reference
|
||||
atomic_add(&info->sync->ref_count, 1);
|
||||
acquire_select_sync(info->sync);
|
||||
}
|
||||
|
||||
return B_OK;
|
||||
|
|
Loading…
Reference in New Issue