scheduler_affine: Migrate threads from overloaded cores

* Keep number of CPU bound threads on cores balanced.
 * If possible migrate normal threads from cores with cpu bound ones to
   the less busy cores.
This commit is contained in:
Pawel Dziepak 2013-10-22 01:11:41 +02:00
parent 7aba623f52
commit 2e0ee59462

View File

@ -89,6 +89,8 @@ typedef Heap<CoreEntry, int32, HeapLesserCompare<int32>,
HeapMemberGetLink<CoreEntry, int32, &CoreEntry::fThreadHeapLink> >
AffineCoreThreadHeap;
static AffineCoreThreadHeap* sCoreThreadHeap;
static int32 sCPUBoundThreads;
static int32 sAssignedThreads;
// sPackageUsageHeap is used to decide which core should be woken up from the
// idle state. When aiming for performance we should use as many packages as
@ -508,6 +510,7 @@ affine_increase_penalty(Thread* thread)
int32 core = schedulerThreadData->previous_core;
ASSERT(core >= 0);
if (schedulerThreadData->additional_penalty == 0) {
sCPUBoundThreads++;
sCoreEntries[core].fCPUBoundThreads++;
affine_update_thread_heaps(core);
}
@ -526,14 +529,6 @@ affine_cancel_penalty(Thread* thread)
if (schedulerThreadData->priority_penalty != 0)
TRACE("cancelling thread %ld penalty\n", thread->id);
if (schedulerThreadData->additional_penalty != 0) {
int32 core = schedulerThreadData->previous_core;
ASSERT(core >= 0);
sCoreEntries[core].fCPUBoundThreads--;
affine_update_thread_heaps(core);
}
schedulerThreadData->priority_penalty = 0;
schedulerThreadData->additional_penalty = 0;
}
@ -679,30 +674,93 @@ affine_choose_cpu(int32 core)
}
static void
affine_assign_thread_to_core(Thread* thread, int32 targetCore)
static bool
affine_should_rebalance(Thread* thread)
{
scheduler_thread_data* schedulerThreadData = thread->scheduler_data;
if (thread_is_idle_thread(thread))
return false;
int32 oldCore = schedulerThreadData->previous_core;
if (oldCore == targetCore)
scheduler_thread_data* schedulerThreadData = thread->scheduler_data;
ASSERT(schedulerThreadData->previous_core >= 0);
CoreEntry* coreEntry = &sCoreEntries[schedulerThreadData->previous_core];
// If this is a cpu bound thread and we have significantly more such threads
// than the average get rid of this one.
if (schedulerThreadData->additional_penalty != 0) {
int32 averageCPUBound = sCPUBoundThreads / sRunQueueCount;
if (coreEntry->fCPUBoundThreads - averageCPUBound > 1)
return true;
return false;
}
// If this thread is not cpu bound but we have at least one consider giving
// this one to someone less busy.
int32 averageThread = sAssignedThreads / sRunQueueCount;
if (coreEntry->fCPUBoundThreads > 0) {
CoreEntry* other = sCoreThreadHeap->PeekRoot();
if (AffineCoreThreadHeap::GetKey(other) <= averageThread)
return true;
}
// No cpu bound threads - the situation is quite good. Make sure it
// won't get much worse...
const int32 kBalanceThreshold = 3;
return coreEntry->fThreads - averageThread > kBalanceThreshold;
}
static void
affine_assign_active_thread_to_core(Thread* thread)
{
if (thread_is_idle_thread(thread))
return;
if (oldCore >= 0) {
sCoreEntries[oldCore].fThreads--;
if (schedulerThreadData->additional_penalty != 0)
sCoreEntries[oldCore].fCPUBoundThreads--;
scheduler_thread_data* schedulerThreadData = thread->scheduler_data;
affine_update_thread_heaps(oldCore);
ASSERT(schedulerThreadData->previous_core >= 0);
int32 core = schedulerThreadData->previous_core;
sCoreEntries[core].fThreads++;
sAssignedThreads++;
if (schedulerThreadData->additional_penalty != 0) {
sCoreEntries[core].fCPUBoundThreads++;
sCPUBoundThreads++;
}
schedulerThreadData->previous_core = targetCore;
if (targetCore >= 0) {
sCoreEntries[targetCore].fThreads++;
if (schedulerThreadData->additional_penalty != 0)
sCoreEntries[targetCore].fCPUBoundThreads++;
affine_update_thread_heaps(targetCore);
affine_update_thread_heaps(core);
}
static inline void
affine_thread_goes_away(Thread* thread)
{
if (thread_is_idle_thread(thread))
return;
scheduler_thread_data* schedulerThreadData = thread->scheduler_data;
ASSERT(schedulerThreadData->previous_core >= 0);
int32 core = schedulerThreadData->previous_core;
ASSERT(sCoreEntries[core].fThreads > 0);
ASSERT(sCoreEntries[core].fThreads > sCoreEntries[core].fCPUBoundThreads
|| (sCoreEntries[core].fThreads == sCoreEntries[core].fCPUBoundThreads
&& schedulerThreadData->additional_penalty != 0));
sCoreEntries[core].fThreads--;
sAssignedThreads--;
if (schedulerThreadData->additional_penalty != 0) {
ASSERT(sCoreEntries[core].fCPUBoundThreads > 0);
sCoreEntries[core].fCPUBoundThreads--;
sCPUBoundThreads--;
}
affine_update_thread_heaps(core);
schedulerThreadData->went_sleep = system_time();
schedulerThreadData->went_sleep_active = sCoreEntries[core].fActiveTime;
}
@ -728,21 +786,31 @@ affine_enqueue(Thread* thread, bool newOne)
targetCPU = thread->previous_cpu->cpu_num;
targetCore = sCPUToCore[targetCPU];
ASSERT(targetCore == schedulerThreadData->previous_core);
if (newOne)
affine_assign_active_thread_to_core(thread);
} else if (schedulerThreadData->previous_core < 0
|| (newOne && affine_has_cache_expired(thread))) {
|| (newOne && affine_has_cache_expired(thread))
|| affine_should_rebalance(thread)) {
if (thread_is_idle_thread(thread)) {
targetCPU = thread->previous_cpu->cpu_num;
targetCore = sCPUToCore[targetCPU];
} else {
if (!newOne)
affine_thread_goes_away(thread);
targetCore = affine_choose_core(threadPriority);
targetCPU = affine_choose_cpu(targetCore);
}
affine_assign_thread_to_core(thread, targetCore);
schedulerThreadData->previous_core = targetCore;
affine_assign_active_thread_to_core(thread);
} else {
targetCore = schedulerThreadData->previous_core;
targetCPU = affine_choose_cpu(targetCore);
if (newOne)
affine_assign_active_thread_to_core(thread);
}
TRACE("enqueueing thread %ld with priority %ld %ld\n", thread->id,
@ -815,26 +883,6 @@ affine_put_back(Thread* thread)
#if 0
/*! Dequeues the thread after the given \a prevThread from the run queue.
*/
static inline Thread *
dequeue_from_run_queue(Thread *prevThread, int32 currentCPU)
{
Thread *resultThread = NULL;
if (prevThread != NULL) {
resultThread = prevThread->queue_next;
prevThread->queue_next = resultThread->queue_next;
} else {
resultThread = sRunQueue[currentCPU];
sRunQueue[currentCPU] = resultThread->queue_next;
}
sRunQueueSize[currentCPU]--;
resultThread->scheduler_data->fLastQueue = -1;
return resultThread;
}
/*! Looks for a possible thread to grab/run from another CPU.
Note: thread lock must be held when entering this function
*/
@ -899,12 +947,17 @@ affine_set_thread_priority(Thread *thread, int32 priority)
thread->id, priority, thread->priority,
affine_get_effective_priority(thread));
if (thread->state == B_THREAD_RUNNING)
if (thread->state == B_THREAD_RUNNING) {
affine_thread_goes_away(thread);
affine_update_priority_heaps(thread->cpu->cpu_num, priority);
}
if (thread->state != B_THREAD_READY) {
affine_cancel_penalty(thread);
thread->priority = priority;
if (thread->state == B_THREAD_RUNNING)
affine_assign_active_thread_to_core(thread);
return;
}
@ -921,6 +974,7 @@ affine_set_thread_priority(Thread *thread, int32 priority)
int32 previousCore = thread->scheduler_data->previous_core;
ASSERT(previousCore >= 0);
sRunQueues[previousCore].Remove(thread);
affine_thread_goes_away(thread);
// set priority and re-insert
affine_cancel_penalty(thread);
@ -1087,15 +1141,6 @@ affine_track_cpu_activity(Thread* oldThread, Thread* nextThread, int32 thisCore)
}
static inline void
affine_thread_goes_sleep(Thread* thread, int32 thisCore)
{
scheduler_thread_data* schedulerThreadData = thread->scheduler_data;
schedulerThreadData->went_sleep = system_time();
schedulerThreadData->went_sleep_active = sCoreEntries[thisCore].fActiveTime;
}
/*! Runs the scheduler.
Note: expects thread spinlock to be held
*/
@ -1142,14 +1187,14 @@ affine_reschedule(void)
break;
case B_THREAD_SUSPENDED:
affine_thread_goes_sleep(oldThread, thisCore);
affine_thread_goes_away(oldThread);
TRACE("reschedule(): suspending thread %ld\n", oldThread->id);
break;
case THREAD_STATE_FREE_ON_RESCHED:
affine_assign_thread_to_core(oldThread, -1);
affine_thread_goes_away(oldThread);
break;
default:
affine_thread_goes_sleep(oldThread, thisCore);
affine_thread_goes_away(oldThread);
TRACE("not enqueueing thread %ld into run queue next_state = %ld\n",
oldThread->id, oldThread->next_state);
break;