* Added support for an iteration callback in IORequest.

* Split IORequest::ChunkFinished() into OperationFinished() and
  and SubrequestFinished(). Moved the notification part into a new
  method NotifyFinished().
* Added new IOScheduler thread for notifying finished requests.
  IOScheduler::_Finisher() hands over finished request to it, unless the
  requests don't have callbacks. We need the separate thread, since the
  callbacks can potentially reenter the scheduler and thus cause
  deadlocks.


git-svn-id: file:///srv/svn/repos/haiku/haiku/trunk@26631 a95241bf-73f2-0310-859d-f6bbb57e9c96
This commit is contained in:
Ingo Weinhold 2008-07-25 13:48:05 +00:00
parent e346074894
commit 8c9804851b
4 changed files with 194 additions and 50 deletions

View File

@ -40,6 +40,8 @@ IOScheduler::IOScheduler(DMAResource* resource)
IOScheduler::~IOScheduler() IOScheduler::~IOScheduler()
{ {
// TODO: Shutdown threads.
mutex_lock(&fLock); mutex_lock(&fLock);
mutex_destroy(&fLock); mutex_destroy(&fLock);
@ -53,6 +55,7 @@ IOScheduler::Init(const char* name)
{ {
fNewRequestCondition.Init(this, "I/O new request"); fNewRequestCondition.Init(this, "I/O new request");
fFinishedOperationCondition.Init(this, "I/O finished operation"); fFinishedOperationCondition.Init(this, "I/O finished operation");
fFinishedRequestCondition.Init(this, "I/O finished request");
size_t count = fDMAResource != NULL ? fDMAResource->BufferCount() : 16; size_t count = fDMAResource != NULL ? fDMAResource->BufferCount() : 16;
for (size_t i = 0; i < count; i++) { for (size_t i = 0; i < count; i++) {
@ -63,13 +66,24 @@ IOScheduler::Init(const char* name)
fUnusedOperations.Add(operation); fUnusedOperations.Add(operation);
} }
// start thread for device // start threads
fThread = spawn_kernel_thread(&_SchedulerThread, name, B_NORMAL_PRIORITY, char buffer[B_OS_NAME_LENGTH];
(void *)this); strlcpy(buffer, name, sizeof(buffer));
if (fThread < B_OK) strlcat(buffer, " scheduler", sizeof(buffer));
return fThread; fSchedulerThread = spawn_kernel_thread(&_SchedulerThread, buffer,
B_NORMAL_PRIORITY, (void *)this);
if (fSchedulerThread < B_OK)
return fSchedulerThread;
resume_thread(fThread); strlcpy(buffer, name, sizeof(buffer));
strlcat(buffer, " notifier", sizeof(buffer));
fRequestNotifierThread = spawn_kernel_thread(&_RequestNotifierThread,
buffer, B_NORMAL_PRIORITY, (void *)this);
if (fRequestNotifierThread < B_OK)
return fRequestNotifierThread;
resume_thread(fSchedulerThread);
resume_thread(fRequestNotifierThread);
return B_OK; return B_OK;
} }
@ -89,13 +103,6 @@ IOScheduler::SetCallback(io_callback callback, void* data)
} }
/*static*/ status_t
IOScheduler::_IOCallbackWrapper(void* data, io_operation* operation)
{
return ((IOCallback*)data)->DoIO(operation);
}
status_t status_t
IOScheduler::ScheduleRequest(IORequest* request) IOScheduler::ScheduleRequest(IORequest* request)
{ {
@ -108,7 +115,7 @@ IOScheduler::ScheduleRequest(IORequest* request)
// lock memory (via another thread or a dedicated call). // lock memory (via another thread or a dedicated call).
if (buffer->IsVirtual()) { if (buffer->IsVirtual()) {
status_t status = buffer->LockMemory(B_CURRENT_TEAM, status_t status = buffer->LockMemory(request->Team(),
request->IsWrite()); request->IsWrite());
if (status != B_OK) if (status != B_OK)
return status; return status;
@ -146,7 +153,8 @@ IOScheduler::OperationCompleted(IOOperation* operation, status_t status)
if (fWaiting) { if (fWaiting) {
SpinLocker _2(thread_spinlock); SpinLocker _2(thread_spinlock);
thread_interrupt(thread_get_thread_struct_locked(fThread), false); thread_interrupt(thread_get_thread_struct_locked(fSchedulerThread),
false);
} }
} }
@ -176,7 +184,7 @@ IOScheduler::_Finisher()
// notify request and remove operation // notify request and remove operation
IORequest* request = operation->Parent(); IORequest* request = operation->Parent();
if (request != NULL) if (request != NULL)
request->ChunkFinished(operation, operation->Status(), true); request->OperationFinished(operation, operation->Status());
// recycle the operation // recycle the operation
MutexLocker _(fLock); MutexLocker _(fLock);
@ -184,6 +192,19 @@ IOScheduler::_Finisher()
fDMAResource->RecycleBuffer(operation->Buffer()); fDMAResource->RecycleBuffer(operation->Buffer());
fUnusedOperations.Add(operation); fUnusedOperations.Add(operation);
// If the request is done, we need to perform its notifications.
if (request->IsFinished()) {
if (request->HasCallbacks()) {
// The request has callbacks that may take some time to perform,
// so we hand it over to the request notifier.
fFinishedRequests.Add(request);
fFinishedRequestCondition.NotifyAll();
} else {
// No callbacks -- finish the request right now.
request->NotifyFinished();
}
}
} }
} }
@ -305,10 +326,53 @@ IOScheduler::_Scheduler()
} }
status_t /*static*/ status_t
IOScheduler::_SchedulerThread(void *_self) IOScheduler::_SchedulerThread(void *_self)
{ {
IOScheduler *self = (IOScheduler *)_self; IOScheduler *self = (IOScheduler *)_self;
return self->_Scheduler(); return self->_Scheduler();
} }
status_t
IOScheduler::_RequestNotifier()
{
while (true) {
MutexLocker locker(fLock);
// get a request
IORequest* request = fFinishedRequests.RemoveHead();
if (request == NULL) {
ConditionVariableEntry entry;
fFinishedRequestCondition.Add(&entry);
fWaiting = true;
locker.Unlock();
entry.Wait();
continue;
}
locker.Unlock();
// notify the request
request->NotifyFinished();
}
}
/*static*/ status_t
IOScheduler::_RequestNotifierThread(void *_self)
{
IOScheduler *self = (IOScheduler*)_self;
return self->_RequestNotifier();
}
/*static*/ status_t
IOScheduler::_IOCallbackWrapper(void* data, io_operation* operation)
{
return ((IOCallback*)data)->DoIO(operation);
}

View File

@ -51,6 +51,8 @@ private:
IORequest* _GetNextUnscheduledRequest(); IORequest* _GetNextUnscheduledRequest();
status_t _Scheduler(); status_t _Scheduler();
static status_t _SchedulerThread(void* self); static status_t _SchedulerThread(void* self);
status_t _RequestNotifier();
static status_t _RequestNotifierThread(void* self);
static status_t _IOCallbackWrapper(void* data, static status_t _IOCallbackWrapper(void* data,
io_operation* operation); io_operation* operation);
@ -59,12 +61,15 @@ private:
DMAResource* fDMAResource; DMAResource* fDMAResource;
spinlock fFinisherLock; spinlock fFinisherLock;
mutex fLock; mutex fLock;
thread_id fThread; thread_id fSchedulerThread;
thread_id fRequestNotifierThread;
io_callback fIOCallback; io_callback fIOCallback;
void* fIOCallbackData; void* fIOCallbackData;
IORequestList fUnscheduledRequests; IORequestList fUnscheduledRequests;
IORequestList fFinishedRequests;
ConditionVariable fNewRequestCondition; ConditionVariable fNewRequestCondition;
ConditionVariable fFinishedOperationCondition; ConditionVariable fFinishedOperationCondition;
ConditionVariable fFinishedRequestCondition;
IOOperationList fUnusedOperations; IOOperationList fUnusedOperations;
IOOperationList fCompletedOperations; IOOperationList fCompletedOperations;
bool fWaiting; bool fWaiting;

View File

@ -486,7 +486,9 @@ IOOperation::_CopyPartialEnd(bool isWrite)
IORequest::IORequest() IORequest::IORequest()
: :
fFinishedCallback(NULL), fFinishedCallback(NULL),
fFinishedCookie(NULL) fFinishedCookie(NULL),
fIterationCallback(NULL),
fIterationCookie(NULL)
{ {
mutex_init(&fLock, "I/O request lock"); mutex_init(&fLock, "I/O request lock");
fFinishedCondition.Init(this, "I/O request finished"); fFinishedCondition.Init(this, "I/O request finished");
@ -541,14 +543,21 @@ IORequest::Init(off_t offset, iovec* vecs, size_t count, size_t length,
void void
IORequest::SetFinishedCallback(io_request_finished_callback callback, IORequest::SetFinishedCallback(io_request_callback callback, void* cookie)
void* cookie)
{ {
fFinishedCallback = callback; fFinishedCallback = callback;
fFinishedCookie = cookie; fFinishedCookie = cookie;
} }
void
IORequest::SetIterationCallback(io_request_callback callback, void* cookie)
{
fIterationCallback = callback;
fIterationCookie = cookie;
}
status_t status_t
IORequest::Wait(uint32 flags, bigtime_t timeout) IORequest::Wait(uint32 flags, bigtime_t timeout)
{ {
@ -571,17 +580,68 @@ IORequest::Wait(uint32 flags, bigtime_t timeout)
void void
IORequest::ChunkFinished(IORequestChunk* chunk, status_t status, bool remove) IORequest::NotifyFinished()
{ {
TRACE("IORequest::ChunkFinished(%p, %#lx, %d): request: %p\n", chunk, TRACE("IORequest::NotifyFinished(): request: %p\n", this);
status, remove, this);
MutexLocker locker(fLock); MutexLocker locker(fLock);
if (remove) { if (fStatus == B_OK && RemainingBytes() > 0) {
fChildren.Remove(chunk); // The request is not really done yet. If it has an iteration callback,
chunk->SetParent(NULL); // call it.
if (fIterationCallback != NULL) {
locker.Unlock();
fIterationCallback(fIterationCookie, this);
} }
} else {
// Cache the callbacks before we unblock waiters and unlock. Any of the
// following could delete this request, so we don't want to touch it
// once we have started telling others that it is done.
IORequest* parent = fParent;
io_request_callback finishedCallback = fFinishedCallback;
void* finishedCookie = fFinishedCookie;
status_t status = fStatus;
// unblock waiters
fFinishedCondition.NotifyAll();
locker.Unlock();
// notify callback
if (finishedCallback != NULL)
finishedCallback(finishedCookie, this);
// notify parent
if (parent != NULL)
parent->SubrequestFinished(this, status);
}
}
/*! Returns whether this request or any of it's ancestors has a finished or
notification callback. Used to decide whether NotifyFinished() can be called
synchronously.
*/
bool
IORequest::HasCallbacks() const
{
if (fFinishedCallback != NULL || fIterationCallback != NULL)
return true;
return fParent != NULL && fParent->HasCallbacks();
}
void
IORequest::OperationFinished(IOOperation* operation, status_t status)
{
TRACE("IORequest::OperationFinished(%p, %#lx): request: %p\n", operation,
status, this);
MutexLocker locker(fLock);
fChildren.Remove(operation);
operation->SetParent(NULL);
if (status != B_OK && fStatus == 1) if (status != B_OK && fStatus == 1)
fStatus = status; fStatus = status;
@ -600,27 +660,32 @@ IORequest::ChunkFinished(IORequestChunk* chunk, status_t status, bool remove)
// not for its ancestors. // not for its ancestors.
if (fBuffer->IsVirtual()) if (fBuffer->IsVirtual())
fBuffer->UnlockMemory(fTeam, fIsWrite); fBuffer->UnlockMemory(fTeam, fIsWrite);
}
// Cache the callbacks before we unblock waiters and unlock. Any of the
// following could delete this request, so we don't want to touch it once
// we have started telling others that it is done.
IORequest* parent = fParent;
io_request_finished_callback finishedCallback = fFinishedCallback;
void* finishedCookie = fFinishedCookie;
status = fStatus;
// unblock waiters void
fFinishedCondition.NotifyAll(); IORequest::SubrequestFinished(IORequest* request, status_t status)
{
TRACE("IORequest::SubrequestFinished(%p, %#lx): request: %p\n", request,
status, this);
MutexLocker locker(fLock);
if (status != B_OK && fStatus == 1)
fStatus = status;
if (--fPendingChildren > 0)
return;
// last child finished
// set status, if not done yet
if (fStatus == 1)
fStatus = B_OK;
locker.Unlock(); locker.Unlock();
// notify callback NotifyFinished();
if (finishedCallback != NULL)
finishedCallback(finishedCookie, this);
// notify parent
if (parent != NULL)
parent->ChunkFinished(this, status, false);
} }

View File

@ -165,8 +165,7 @@ typedef IOOperation io_operation;
typedef DoublyLinkedList<IOOperation> IOOperationList; typedef DoublyLinkedList<IOOperation> IOOperationList;
typedef struct IORequest io_request; typedef struct IORequest io_request;
typedef status_t (*io_request_finished_callback)(void* data, typedef status_t (*io_request_callback)(void* data, io_request* request);
io_request* request);
struct IORequest : IORequestChunk, DoublyLinkedListLinkImpl<IORequest> { struct IORequest : IORequestChunk, DoublyLinkedListLinkImpl<IORequest> {
@ -179,15 +178,24 @@ struct IORequest : IORequestChunk, DoublyLinkedListLinkImpl<IORequest> {
size_t length, bool write, uint32 flags); size_t length, bool write, uint32 flags);
void SetFinishedCallback( void SetFinishedCallback(
io_request_finished_callback callback, io_request_callback callback,
void* cookie);
void SetIterationCallback(
io_request_callback callback,
void* cookie); void* cookie);
status_t Wait(uint32 flags = 0, bigtime_t timeout = 0); status_t Wait(uint32 flags = 0, bigtime_t timeout = 0);
bool IsFinished() const { return fStatus != 1; } bool IsFinished() const
{ return fStatus != 1
&& fPendingChildren == 0; }
void NotifyFinished();
bool HasCallbacks() const;
void ChunkFinished(IORequestChunk* chunk, void OperationFinished(IOOperation* operation,
status_t status, bool remove); status_t status);
void SubrequestFinished(IORequest* request,
status_t status);
size_t RemainingBytes() const size_t RemainingBytes() const
{ return fRemainingBytes; } { return fRemainingBytes; }
@ -234,8 +242,10 @@ private:
team_id fTeam; team_id fTeam;
bool fIsWrite; bool fIsWrite;
io_request_finished_callback fFinishedCallback; io_request_callback fFinishedCallback;
void* fFinishedCookie; void* fFinishedCookie;
io_request_callback fIterationCallback;
void* fIterationCookie;
ConditionVariable fFinishedCondition; ConditionVariable fFinishedCondition;
// these are for iteration // these are for iteration