launch_daemon: Moved (Main)Worker to its own file.

This commit is contained in:
Axel Dörfler 2015-06-01 16:37:47 +02:00
parent f913a08780
commit 0337b4adfd
4 changed files with 158 additions and 136 deletions

View File

@ -6,6 +6,7 @@ UsePrivateSystemHeaders ;
Server launch_daemon
:
LaunchDaemon.cpp
Worker.cpp
# init jobs
AbstractEmptyDirectoryJob.cpp

View File

@ -21,16 +21,16 @@
#include <AppMisc.h>
#include <DriverSettingsMessageAdapter.h>
#include <JobQueue.h>
#include <LaunchDaemonDefs.h>
#include <syscalls.h>
#include "InitRealTimeClockJob.h"
#include "InitSharedMemoryDirectoryJob.h"
#include "InitTemporaryDirectoryJob.h"
#include "Worker.h"
using namespace BPrivate;
using namespace ::BPrivate;
using namespace BSupportKit;
using BSupportKit::BPrivate::JobQueue;
@ -139,38 +139,6 @@ protected:
};
class Worker {
public:
Worker(JobQueue& queue);
virtual ~Worker();
protected:
virtual status_t Process();
virtual bigtime_t Timeout() const;
virtual status_t Run(BJob* job);
private:
static status_t _Process(void* self);
protected:
thread_id fThread;
JobQueue& fJobQueue;
};
class MainWorker : public Worker {
public:
MainWorker(JobQueue& queue);
protected:
virtual bigtime_t Timeout() const;
virtual status_t Run(BJob* job);
private:
int32 fCPUCount;
};
typedef std::map<BString, Job*> JobMap;
@ -211,12 +179,6 @@ private:
};
static const bigtime_t kWorkerTimeout = 1000000;
// One second until a worker thread quits without a job
static int32 sWorkerCount;
static const char*
get_leaf(const char* signature)
{
@ -534,102 +496,6 @@ Target::Execute()
// #pragma mark -
Worker::Worker(JobQueue& queue)
:
fJobQueue(queue)
{
fThread = spawn_thread(&Worker::_Process, "worker", B_NORMAL_PRIORITY,
this);
if (fThread >= 0 && resume_thread(fThread) == B_OK)
atomic_add(&sWorkerCount, 1);
}
Worker::~Worker()
{
}
status_t
Worker::Process()
{
while (true) {
BJob* job;
status_t status = fJobQueue.Pop(Timeout(), false, &job);
if (status != B_OK)
return status;
Run(job);
// TODO: proper error reporting on failed job!
}
}
bigtime_t
Worker::Timeout() const
{
return kWorkerTimeout;
}
status_t
Worker::Run(BJob* job)
{
return job->Run();
}
/*static*/ status_t
Worker::_Process(void* _self)
{
Worker* self = (Worker*)_self;
status_t status = self->Process();
delete self;
return status;
}
// #pragma mark -
MainWorker::MainWorker(JobQueue& queue)
:
Worker(queue)
{
// TODO: keep track of workers, and quit them on destruction
system_info info;
if (get_system_info(&info) == B_OK)
fCPUCount = info.cpu_count;
}
bigtime_t
MainWorker::Timeout() const
{
return B_INFINITE_TIMEOUT;
}
status_t
MainWorker::Run(BJob* job)
{
int32 count = atomic_get(&sWorkerCount);
size_t jobCount = fJobQueue.CountJobs();
if (jobCount > INT_MAX)
jobCount = INT_MAX;
if ((int32)jobCount > count && count < fCPUCount)
new Worker(fJobQueue);
return Worker::Run(job);
}
// #pragma mark -
LaunchDaemon::LaunchDaemon(status_t& error)
:
BServer(kLaunchDaemonSignature, NULL,

View File

@ -0,0 +1,106 @@
/*
* Copyright 2015, Axel Dörfler, axeld@pinc-software.de.
* Distributed under the terms of the MIT License.
*/
#include "Worker.h"
static const bigtime_t kWorkerTimeout = 1000000;
// One second until a worker thread quits without a job
static int32 sWorkerCount;
Worker::Worker(JobQueue& queue)
:
fJobQueue(queue)
{
fThread = spawn_thread(&Worker::_Process, "worker", B_NORMAL_PRIORITY,
this);
if (fThread >= 0 && resume_thread(fThread) == B_OK)
atomic_add(&sWorkerCount, 1);
}
Worker::~Worker()
{
}
status_t
Worker::Process()
{
while (true) {
BJob* job;
status_t status = fJobQueue.Pop(Timeout(), false, &job);
if (status != B_OK)
return status;
Run(job);
// TODO: proper error reporting on failed job!
}
}
bigtime_t
Worker::Timeout() const
{
return kWorkerTimeout;
}
status_t
Worker::Run(BJob* job)
{
return job->Run();
}
/*static*/ status_t
Worker::_Process(void* _self)
{
Worker* self = (Worker*)_self;
status_t status = self->Process();
delete self;
return status;
}
// #pragma mark -
MainWorker::MainWorker(JobQueue& queue)
:
Worker(queue)
{
// TODO: keep track of workers, and quit them on destruction
system_info info;
if (get_system_info(&info) == B_OK)
fCPUCount = info.cpu_count;
}
bigtime_t
MainWorker::Timeout() const
{
return B_INFINITE_TIMEOUT;
}
status_t
MainWorker::Run(BJob* job)
{
int32 count = atomic_get(&sWorkerCount);
size_t jobCount = fJobQueue.CountJobs();
if (jobCount > INT_MAX)
jobCount = INT_MAX;
if ((int32)jobCount > count && count < fCPUCount)
new Worker(fJobQueue);
return Worker::Run(job);
}

View File

@ -0,0 +1,49 @@
/*
* Copyright 2015, Axel Dörfler, axeld@pinc-software.de.
* Distributed under the terms of the MIT License.
*/
#ifndef WORKER_H
#define WORKER_H
#include <Job.h>
#include <JobQueue.h>
using namespace BSupportKit;
using BSupportKit::BPrivate::JobQueue;
class Worker {
public:
Worker(JobQueue& queue);
virtual ~Worker();
protected:
virtual status_t Process();
virtual bigtime_t Timeout() const;
virtual status_t Run(BJob* job);
private:
static status_t _Process(void* self);
protected:
thread_id fThread;
JobQueue& fJobQueue;
};
class MainWorker : public Worker {
public:
MainWorker(JobQueue& queue);
protected:
virtual bigtime_t Timeout() const;
virtual status_t Run(BJob* job);
private:
int32 fCPUCount;
};
#endif // WORKER_H