JobQueue: fixed leak, notification, added Pop() variant.

* Was leaking fQueuedJobs on destruction.
* fHaveRunnableJobSem implementation was not completed; it was never
  released.
* Added Pop() variant that is a bit more flexible, and allows for a
  timeout as well as waiting even when the queue is empty, and can
  return a status code.
This commit is contained in:
Axel Dörfler 2015-05-31 19:03:34 +02:00
parent 3375ee66bc
commit 6ff95509c2
2 changed files with 51 additions and 18 deletions

View File

@ -30,8 +30,12 @@ public:
// gives up ownership
BJob* Pop();
status_t Pop(bigtime_t timeout, bool returnWhenEmpty,
BJob** _job);
// caller owns job
size_t CountJobs() const;
void Close();
private:
@ -49,7 +53,7 @@ private:
void _RequeueDependantJobsOf(BJob* job);
void _RemoveDependantJobsOf(BJob* job);
BLocker fLock;
mutable BLocker fLock;
uint32 fNextTicketNumber;
JobPriorityQueue* fQueuedJobs;
sem_id fHaveRunnableJobSem;

View File

@ -3,6 +3,7 @@
* Distributed under the terms of the MIT License.
*
* Authors:
* Axel Dörfler <axeld@pinc-software.de>
* Oliver Tappe <zooey@hirschkaefer.de>
*/
@ -50,6 +51,9 @@ class JobQueue::JobPriorityQueue
};
// #pragma mark -
JobQueue::JobQueue()
:
fLock("job queue"),
@ -61,6 +65,8 @@ JobQueue::JobQueue()
JobQueue::~JobQueue()
{
Close();
delete fQueuedJobs;
}
@ -89,6 +95,8 @@ JobQueue::AddJob(BJob* job)
}
BJob::Private(*job).SetTicketNumber(fNextTicketNumber++);
job->AddStateListener(this);
if (job->IsRunnable())
release_sem(fHaveRunnableJobSem);
}
return B_OK;
@ -137,35 +145,54 @@ JobQueue::JobFailed(BJob* job)
BJob*
JobQueue::Pop()
{
BJob* job;
if (Pop(B_INFINITE_TIMEOUT, true, &job) == B_OK)
return job;
return NULL;
}
status_t
JobQueue::Pop(bigtime_t timeout, bool returnWhenEmpty, BJob** _job)
{
BAutolock lock(&fLock);
if (lock.IsLocked()) {
JobPriorityQueue::iterator head = fQueuedJobs->begin();
if (head == fQueuedJobs->end())
return NULL;
while (!(*head)->IsRunnable()) {
// we need to wait until a job becomes runnable
while (true) {
JobPriorityQueue::iterator head = fQueuedJobs->begin();
if (head != fQueuedJobs->end()) {
if ((*head)->IsRunnable()) {
*_job = *head;
fQueuedJobs->erase(head);
return B_OK;
}
} else if (returnWhenEmpty)
return B_ENTRY_NOT_FOUND;
// we need to wait until a job becomes available/runnable
status_t result;
do {
lock.Unlock();
result = acquire_sem(fHaveRunnableJobSem);
result = acquire_sem_etc(fHaveRunnableJobSem, 1,
B_RELATIVE_TIMEOUT, timeout);
if (!lock.Lock())
return NULL;
return B_ERROR;
} while (result == B_INTERRUPTED);
if (result != B_OK)
return NULL;
// fetch current head, it must be runnable now
head = fQueuedJobs->begin();
if (head == fQueuedJobs->end())
return NULL;
return result;
}
BJob* job = *head;
fQueuedJobs->erase(head);
return job;
}
return NULL;
return B_ERROR;
}
size_t
JobQueue::CountJobs() const
{
BAutolock locker(fLock);
return fQueuedJobs->size();
}
@ -222,6 +249,8 @@ JobQueue::_RequeueDependantJobsOf(BJob* job)
dependantJob->RemoveDependency(job);
try {
fQueuedJobs->insert(dependantJob);
if (dependantJob->IsRunnable())
release_sem(fHaveRunnableJobSem);
} catch (...) {
}
}