scheduler: Estimate the load thread is able to produce

Previous implementation based on the actual load of each core and share
each thread has in that load turned up to be very problematic when
balancing load on very heavily loaded systems (i.e. more threads
consuming all available CPU time than there is logical CPUs).

The new approach is to estimate how much load would a thread produce
if it had all CPU time only for itself. Summing such load estimations
of each thread assigned to a given core we get a rank that contains
much more information than just simple actual core load.
This commit is contained in:
Pawel Dziepak 2014-01-08 22:59:04 +01:00
parent 772331c7cd
commit a2634874ed
10 changed files with 186 additions and 112 deletions

View File

@ -15,10 +15,9 @@ const bigtime_t kIntervalInaccuracy = kLoadMeasureInterval / 4;
static inline int32
compute_load(bigtime_t& measureTime, bigtime_t& measureActiveTime, int32& load)
compute_load(bigtime_t& measureTime, bigtime_t& measureActiveTime, int32& load,
bigtime_t now)
{
bigtime_t now = system_time();
if (measureTime == 0) {
measureTime = now;
return -1;

View File

@ -256,7 +256,7 @@ update_int_load(int i)
int32 oldLoad = sVectors[i].load;
compute_load(sVectors[i].last_measure_time, sVectors[i].last_measure_active,
sVectors[i].load);
sVectors[i].load, system_time());
if (oldLoad != sVectors[i].load)
atomic_add(&sVectors[i].assigned_cpu->load, sVectors[i].load - oldLoad);
@ -365,9 +365,13 @@ int_io_interrupt_handler(int vector, bool levelTriggered)
vectorLocker.Unlock();
cpu_ent* cpu = get_cpu_struct();
cpu->interrupt_time += deltaTime;
if (sVectors[vector].type == INTERRUPT_TYPE_IRQ)
cpu->irq_time += deltaTime;
if (sVectors[vector].type == INTERRUPT_TYPE_IRQ
|| sVectors[vector].type == INTERRUPT_TYPE_ICI
|| sVectors[vector].type == INTERRUPT_TYPE_LOCAL_IRQ) {
cpu->interrupt_time += deltaTime;
if (sVectors[vector].type == INTERRUPT_TYPE_IRQ)
cpu->irq_time += deltaTime;
}
update_int_load(vector);

View File

@ -78,13 +78,13 @@ should_rebalance(const ThreadData* threadData)
SCHEDULER_ENTER_FUNCTION();
int32 coreLoad = threadData->Core()->GetLoad();
int32 threadLoad = threadData->GetLoad() / threadData->Core()->CPUCount();
// If the thread produces more than 50% of the load, leave it here. In
// such situation it is better to move other threads away.
if (threadData->GetLoad() >= coreLoad / 2)
if (threadLoad >= coreLoad / 2)
return false;
int32 threadLoad = threadData->GetLoad();
int32 coreNewLoad = coreLoad - threadLoad;
// If there is high load on this core but this thread does not contribute
@ -160,11 +160,11 @@ rebalance_irqs(bool idle)
scheduler_mode_operations gSchedulerLowLatencyMode = {
"low latency",
2000,
100,
{ 2, 25 },
1000,
50,
{ 2, 50 },
10000,
50000,
switch_to_mode,
set_cpu_enabled,

View File

@ -123,7 +123,7 @@ should_rebalance(const ThreadData* threadData)
CoreEntry* core = threadData->Core();
int32 coreLoad = core->GetLoad();
int32 threadLoad = threadData->GetLoad();
int32 threadLoad = threadData->GetLoad() / core->CPUCount();
if (coreLoad > kHighLoad) {
if (sSmallTaskCore == core) {
sSmallTaskCore = NULL;

View File

@ -50,8 +50,8 @@ scheduler_mode gCurrentModeID;
scheduler_mode_operations* gCurrentMode;
bool gSingleCore;
bool gCPUFrequencyManagement;
bool gTrackLoad;
bool gTrackCoreLoad;
bool gTrackCPULoad;
} // namespace Scheduler
@ -388,23 +388,33 @@ reschedule(int32 nextState)
case B_THREAD_READY:
enqueueOldThread = true;
if (oldThreadData->HasQuantumEnded(oldThread->cpu->preempted,
oldThread->has_yielded)) {
TRACE("enqueueing thread %ld into run queue priority = %ld\n",
oldThread->id, oldThreadData->GetEffectivePriority());
putOldThreadAtBack = true;
} else {
TRACE("putting thread %ld back in run queue priority = %ld\n",
oldThread->id, oldThreadData->GetEffectivePriority());
putOldThreadAtBack = false;
if (!thread_is_idle_thread(oldThread)) {
oldThreadData->Continues();
if (oldThreadData->HasQuantumEnded(oldThread->cpu->preempted,
oldThread->has_yielded)) {
TRACE("enqueueing thread %ld into run queue priority ="
" %ld\n", oldThread->id,
oldThreadData->GetEffectivePriority());
putOldThreadAtBack = true;
} else {
TRACE("putting thread %ld back in run queue priority ="
" %ld\n", oldThread->id,
oldThreadData->GetEffectivePriority());
putOldThreadAtBack = false;
}
}
break;
case THREAD_STATE_FREE_ON_RESCHED:
oldThreadData->Dies();
if (gCPU[thisCPU].disabled)
oldThreadData->UnassignCore(true);
break;
default:
oldThreadData->GoesAway();
if (gCPU[thisCPU].disabled)
oldThreadData->UnassignCore(true);
TRACE("not enqueueing thread %ld into run queue next_state = %ld\n",
oldThread->id, nextState);
break;
@ -422,6 +432,7 @@ reschedule(int32 nextState)
cpu->Remove(nextThreadData);
putOldThreadAtBack = oldThread->pinned_to_cpu == 0;
oldThreadData->UnassignCore(true);
} else
nextThreadData = oldThreadData;
} else {
@ -476,6 +487,8 @@ reschedule(int32 nextState)
bigtime_t quantum = nextThreadData->ComputeQuantum();
add_timer(quantumTimer, &reschedule_event, quantum,
B_ONE_SHOT_RELATIVE_TIMER);
nextThreadData->Continues();
} else
gCurrentMode->rebalance_irqs(true);
nextThreadData->StartQuantum();
@ -696,12 +709,12 @@ init()
// disable parts of the scheduler logic that are not needed
gSingleCore = coreCount == 1;
gCPUFrequencyManagement = increase_cpu_performance(0) == B_OK;
gTrackLoad = !gSingleCore || gCPUFrequencyManagement;
dprintf("scheduler switches: single core: %s, cpufreq: %s, load tracking:"
" %s\n", gSingleCore ? "true" : "false",
gCPUFrequencyManagement ? "true" : "false",
gTrackLoad ? "true" : "false");
gTrackCPULoad = increase_cpu_performance(0) == B_OK;
gTrackCoreLoad = !gSingleCore || gTrackCPULoad;
dprintf("scheduler switches: single core: %s, cpu load tracking: %s,"
" core load tracking: %s\n", gSingleCore ? "true" : "false",
gTrackCPULoad ? "true" : "false",
gTrackCoreLoad ? "true" : "false");
gCoreCount = coreCount;
gPackageCount = packageCount;

View File

@ -43,8 +43,8 @@ const int kVeryHighLoad = (kMaxLoad + kHighLoad) / 2;
const int kLoadDifference = kMaxLoad * 20 / 100;
extern bool gSingleCore;
extern bool gCPUFrequencyManagement;
extern bool gTrackLoad;
extern bool gTrackCoreLoad;
extern bool gTrackCPULoad;
void init_debug_commands();

View File

@ -190,19 +190,15 @@ CPUEntry::ComputeLoad()
{
SCHEDULER_ENTER_FUNCTION();
ASSERT(gTrackLoad);
ASSERT(gTrackCPULoad);
ASSERT(!gCPU[fCPUNumber].disabled);
ASSERT(fCPUNumber == smp_get_current_cpu());
int oldLoad = compute_load(fMeasureTime, fMeasureActiveTime, fLoad);
int oldLoad = compute_load(fMeasureTime, fMeasureActiveTime, fLoad,
system_time());
if (oldLoad < 0)
return;
if (oldLoad != fLoad) {
int32 delta = fLoad - oldLoad;
fCore->UpdateLoad(delta);
}
if (fLoad > kVeryHighLoad)
gCurrentMode->rebalance_irqs(false);
}
@ -272,11 +268,10 @@ CPUEntry::TrackActivity(ThreadData* oldThreadData, ThreadData* nextThreadData)
oldThreadData->UpdateActivity(active);
}
if (gTrackLoad) {
oldThreadData->ComputeLoad();
nextThreadData->ComputeLoad();
if (gTrackCPULoad) {
if (!cpuEntry->disabled)
ComputeLoad();
_RequestPerformanceLevel(nextThreadData);
}
Thread* nextThread = nextThreadData->GetThread();
@ -285,9 +280,6 @@ CPUEntry::TrackActivity(ThreadData* oldThreadData, ThreadData* nextThreadData)
cpuEntry->last_user_time = nextThread->user_time;
nextThreadData->SetLastInterruptTime(cpuEntry->interrupt_time);
if (gCPUFrequencyManagement)
_RequestPerformanceLevel(nextThreadData);
}
}
@ -365,7 +357,8 @@ CoreEntry::CoreEntry()
fThreadCount(0),
fActiveTime(0),
fLoad(0),
fHighLoad(false)
fHighLoad(false),
fLastLoadUpdate(0)
{
B_INITIALIZE_SPINLOCK(&fCPULock);
B_INITIALIZE_SPINLOCK(&fQueueLock);
@ -425,20 +418,24 @@ CoreEntry::UpdateLoad(int32 delta)
{
SCHEDULER_ENTER_FUNCTION();
if (fCPUCount == 0) {
fLoad = 0;
return;
}
ASSERT(gTrackCoreLoad);
atomic_add(&fLoad, delta);
WriteSpinLocker coreLocker(gCoreHeapsLock);
bigtime_t now = system_time();
if (now < kLoadMeasureInterval + fLastLoadUpdate)
return;
if (!try_acquire_write_spinlock(&gCoreHeapsLock))
return;
WriteSpinLocker coreLocker(gCoreHeapsLock, true);
fLastLoadUpdate = now;
int32 newKey = GetLoad();
int32 oldKey = CoreLoadHeap::GetKey(this);
ASSERT(oldKey >= 0 && oldKey <= kMaxLoad);
ASSERT(newKey >= 0 && newKey <= kMaxLoad);
ASSERT(oldKey >= 0);
ASSERT(newKey >= 0);
if (oldKey == newKey)
return;
@ -502,6 +499,9 @@ CoreEntry::RemoveCPU(CPUEntry* cpu, ThreadProcessing& threadPostProcessing)
fIdleCPUCount--;
if (--fCPUCount == 0) {
// unassign threads
thread_map(CoreEntry::_UnassignThread, this);
// core has been disabled
if (fHighLoad) {
gCoreHighLoadHeap.ModifyKey(this, -1);
@ -516,8 +516,6 @@ CoreEntry::RemoveCPU(CPUEntry* cpu, ThreadProcessing& threadPostProcessing)
fPackage->RemoveIdleCore(this);
// get rid of threads
thread_map(CoreEntry::_UnassignThread, this);
while (fRunQueue.PeekMaximum() != NULL) {
ThreadData* threadData = fRunQueue.PeekMaximum();
@ -535,7 +533,6 @@ CoreEntry::RemoveCPU(CPUEntry* cpu, ThreadProcessing& threadPostProcessing)
fCPUHeap.RemoveRoot();
ASSERT(cpu->GetLoad() >= 0 && cpu->GetLoad() <= kMaxLoad);
fLoad -= cpu->GetLoad();
ASSERT(fLoad >= 0);
}
@ -565,9 +562,8 @@ CoreLoadHeap::Dump()
while (entry) {
int32 key = GetKey(entry);
int32 activeCPUs = entry->CPUCount() - entry->IdleCPUCount();
kprintf("%4" B_PRId32 " %3" B_PRId32 "%% %7" B_PRId32 "\n", entry->ID(),
entry->GetLoad() / 10, entry->ThreadCount() + activeCPUs);
entry->GetLoad() / 10, entry->ThreadCount());
RemoveMinimum();
sDebugCoreHeap.Insert(entry, key);
@ -685,8 +681,6 @@ dump_run_queue(int /* argc */, char** /* argv */)
static int
dump_cpu_heap(int /* argc */, char** /* argv */)
{
kprintf("Total ready threads: %" B_PRId32 "\n\n", gReadyThreadCount);
kprintf("core load threads\n");
gCoreLoadHeap.Dump();
kprintf("\n");

View File

@ -123,16 +123,13 @@ public:
inline PackageEntry* Package() const { return fPackage; }
inline int32 CPUCount() const
{ return fCPUCount; }
inline int32 IdleCPUCount() const
{ return fIdleCPUCount; }
inline void LockCPUHeap();
inline void UnlockCPUHeap();
inline CPUPriorityHeap* CPUHeap();
inline int32 ThreadCount() const
{ return fThreadCount; }
inline int32 ThreadCount() const;
inline void LockRunQueue();
inline void UnlockRunQueue();
@ -188,6 +185,7 @@ private:
int32 fLoad;
bool fHighLoad;
bigtime_t fLastLoadUpdate;
friend class DebugDumper;
} CACHE_LINE_ALIGN;
@ -332,6 +330,14 @@ CoreEntry::CPUHeap()
}
inline int32
CoreEntry::ThreadCount() const
{
SCHEDULER_ENTER_FUNCTION();
return fThreadCount + fCPUCount - fIdleCPUCount;
}
inline void
CoreEntry::LockRunQueue()
{

View File

@ -6,14 +6,6 @@
#include "scheduler_thread.h"
namespace Scheduler {
int32 gReadyThreadCount;
} // namespace Scheduler
using namespace Scheduler;
@ -33,9 +25,11 @@ ThreadData::_InitBase()
fTimeLeft = 0;
fStolenTime = 0;
fMeasureActiveTime = 0;
fMeasureTime = 0;
fLoad = 0;
fMeasureAvailableActiveTime = 0;
fLastMeasureAvailableTime = 0;
fMeasureAvailableTime = 0;
fNeededLoad = 0;
fWentSleep = 0;
fWentSleepActive = 0;
@ -43,6 +37,7 @@ ThreadData::_InitBase()
fWentSleepCountIdle = 0;
fEnqueued = false;
fReady = false;
}
@ -113,6 +108,14 @@ ThreadData::Init()
ThreadData* currentThreadData = thread_get_current_thread()->scheduler_data;
fCore = currentThreadData->fCore;
if (fThread->priority < B_FIRST_REAL_TIME_PRIORITY) {
fPriorityPenalty = std::min(currentThreadData->fPriorityPenalty,
std::max(fThread->priority - _GetMinimalPriority(), int32(0)));
fAdditionalPenalty = currentThreadData->fAdditionalPenalty;
_ComputeEffectivePriority();
}
}
@ -122,6 +125,7 @@ ThreadData::Init(CoreEntry* core)
_InitBase();
fCore = core;
fReady = true;
}
@ -155,7 +159,7 @@ ThreadData::Dump() const
kprintf("\tstolen_time:\t\t%" B_PRId64 " us\n", fStolenTime);
kprintf("\tquantum_start:\t\t%" B_PRId64 " us\n", fQuantumStart);
kprintf("\tload:\t\t\t%" B_PRId32 "%%\n", fLoad / 10);
kprintf("\tneeded_load:\t\t%" B_PRId32 "%%\n", fNeededLoad / 10);
kprintf("\twent_sleep:\t\t%" B_PRId64 "\n", fWentSleep);
kprintf("\twent_sleep_active:\t%" B_PRId64 "\n", fWentSleepActive);
kprintf("\twent_sleep_count:\t%" B_PRId32 "\n", fWentSleepCount);
@ -185,21 +189,16 @@ ThreadData::ChooseCoreAndCPU(CoreEntry*& targetCore, CPUEntry*& targetCPU)
ASSERT(targetCore != NULL);
ASSERT(targetCPU != NULL);
if (fReady && fCore != targetCore && fCore != NULL) {
fCore->UpdateLoad(-fNeededLoad);
targetCore->UpdateLoad(fNeededLoad);
}
fCore = targetCore;
return rescheduleNeeded;
}
void
ThreadData::ComputeLoad()
{
SCHEDULER_ENTER_FUNCTION();
ASSERT(gTrackLoad);
compute_load(fMeasureTime, fMeasureActiveTime, fLoad);
}
bigtime_t
ThreadData::ComputeQuantum()
{
@ -272,6 +271,21 @@ ThreadData::_GetPenalty() const
}
void
ThreadData::_ComputeNeededLoad()
{
SCHEDULER_ENTER_FUNCTION();
int32 oldLoad = compute_load(fLastMeasureAvailableTime,
fMeasureAvailableActiveTime, fNeededLoad, fMeasureAvailableTime);
if (oldLoad < 0 || oldLoad == fNeededLoad)
return;
int32 delta = fNeededLoad - oldLoad;
fCore->UpdateLoad(delta);
}
void
ThreadData::_ComputeEffectivePriority() const
{

View File

@ -56,6 +56,7 @@ public:
{ fLastInterruptTime = interruptTime; }
inline void SetStolenInterruptTime(bigtime_t interruptTime);
inline void Continues();
inline void GoesAway();
inline void Dies();
@ -67,7 +68,6 @@ public:
inline bool Dequeue();
inline void UpdateActivity(bigtime_t active);
void ComputeLoad();
inline bool HasQuantumEnded(bool wasPreempted, bool hasYielded);
bigtime_t ComputeQuantum();
@ -77,10 +77,10 @@ public:
inline void SetDequeued() { fEnqueued = false; }
inline Thread* GetThread() const { return fThread; }
inline int32 GetLoad() const { return fLoad; }
inline int32 GetLoad() const { return fNeededLoad; }
inline CoreEntry* Core() const { return fCore; }
inline void UnassignCore() { fCore = NULL; }
inline void UnassignCore(bool running = false);
static void ComputeQuantumLengths();
@ -88,6 +88,8 @@ private:
inline void _IncreasePenalty(bool strong);
inline int32 _GetPenalty() const;
void _ComputeNeededLoad();
void _ComputeEffectivePriority() const;
static bigtime_t _ScaleQuantum(bigtime_t maxQuantum,
@ -104,6 +106,7 @@ private:
int32 fWentSleepCountIdle;
bool fEnqueued;
bool fReady;
Thread* fThread;
@ -116,9 +119,11 @@ private:
bigtime_t fTimeLeft;
bigtime_t fMeasureActiveTime;
bigtime_t fMeasureTime;
int32 fLoad;
bigtime_t fMeasureAvailableActiveTime;
bigtime_t fMeasureAvailableTime;
bigtime_t fLastMeasureAvailableTime;
int32 fNeededLoad;
CoreEntry* fCore;
};
@ -130,8 +135,6 @@ public:
virtual void operator()(ThreadData* thread) = 0;
};
extern int32 gReadyThreadCount;
inline int32
ThreadData::_GetMinimalPriority() const
@ -235,8 +238,6 @@ ThreadData::ShouldCancelPenalty() const
if (fCore == NULL)
return false;
if (system_time() - fWentSleep > gCurrentMode->minimal_quantum * 2)
return false;
if (GetEffectivePriority() != B_LOWEST_ACTIVE_PRIORITY
&& !IsCPUBound()) {
@ -255,7 +256,17 @@ ThreadData::SetStolenInterruptTime(bigtime_t interruptTime)
interruptTime -= fLastInterruptTime;
fStolenTime += interruptTime;
fMeasureActiveTime -= interruptTime;
}
inline void
ThreadData::Continues()
{
SCHEDULER_ENTER_FUNCTION();
ASSERT(fReady);
if (gTrackCoreLoad)
_ComputeNeededLoad();
}
@ -264,6 +275,8 @@ ThreadData::GoesAway()
{
SCHEDULER_ENTER_FUNCTION();
ASSERT(fReady);
if (!fReceivedPenalty)
_IncreasePenalty(false);
fHasSlept = true;
@ -275,7 +288,9 @@ ThreadData::GoesAway()
fWentSleepCountIdle = fCore->StarvationCounterIdle();
fWentSleepActive = fCore->GetActiveTime();
atomic_add(&gReadyThreadCount, -1);
if (gTrackCoreLoad)
fCore->UpdateLoad(-fNeededLoad);
fReady = false;
}
@ -283,7 +298,11 @@ inline void
ThreadData::Dies()
{
SCHEDULER_ENTER_FUNCTION();
atomic_add(&gReadyThreadCount, -1);
ASSERT(fReady);
if (gTrackCoreLoad)
fCore->UpdateLoad(-fNeededLoad);
fReady = false;
}
@ -292,9 +311,6 @@ ThreadData::PutBack()
{
SCHEDULER_ENTER_FUNCTION();
if (gTrackLoad)
ComputeLoad();
int32 priority = GetEffectivePriority();
if (fThread->pinned_to_cpu > 0) {
@ -321,14 +337,19 @@ ThreadData::Enqueue()
{
SCHEDULER_ENTER_FUNCTION();
if (fThread->state != B_THREAD_READY && fThread->state != B_THREAD_RUNNING)
atomic_add(&gReadyThreadCount, 1);
if (!fReady) {
ASSERT(system_time() - fWentSleep > 0);
if (gTrackCoreLoad) {
fMeasureAvailableTime += system_time() - fWentSleep;
fCore->UpdateLoad(fNeededLoad);
_ComputeNeededLoad();
}
fReady = true;
}
fThread->state = B_THREAD_READY;
if (gTrackLoad)
ComputeLoad();
int32 priority = GetEffectivePriority();
if (fThread->pinned_to_cpu > 0) {
@ -381,7 +402,12 @@ inline void
ThreadData::UpdateActivity(bigtime_t active)
{
SCHEDULER_ENTER_FUNCTION();
fMeasureActiveTime += active;
if (!gTrackCoreLoad)
return;
fMeasureAvailableTime += active;
fMeasureAvailableActiveTime += active;
}
@ -396,9 +422,8 @@ ThreadData::HasQuantumEnded(bool wasPreempted, bool hasYielded)
}
bigtime_t timeUsed = system_time() - fQuantumStart;
if (timeUsed > 0);
fTimeLeft -= timeUsed;
fTimeLeft = std::max(fTimeLeft, bigtime_t(0));
ASSERT(timeUsed >= 0);
fTimeLeft -= timeUsed;
// too little time left, it's better make the next quantum a bit longer
int32 skipTime = gCurrentMode->minimal_quantum;
@ -427,6 +452,25 @@ ThreadData::StartQuantum()
}
inline void
ThreadData::UnassignCore(bool running)
{
SCHEDULER_ENTER_FUNCTION();
ASSERT(fCore != NULL);
if (!fReady)
fCore = NULL;
if (running || fThread->state == B_THREAD_READY) {
if (gTrackCoreLoad)
fCore->UpdateLoad(-fNeededLoad);
fReady = false;
fThread->state = B_THREAD_SUSPENDED;
fCore = NULL;
}
}
} // namespace Scheduler