scheduler: Use core load to distribute threads

This commit is contained in:
Pawel Dziepak 2013-10-28 00:39:16 +01:00
parent d80cdf504f
commit dc38e6ca87

View File

@ -56,6 +56,8 @@ const bigtime_t kMinimalWaitTime = kThreadQuantum / 4;
const bigtime_t kCacheExpire = 100000;
const bigtime_t kHighLoad = 600;
static int sDisableSmallTaskPacking;
static int32 sSmallTaskCore;
@ -85,7 +87,7 @@ static CPUHeap* sCPUPriorityHeaps;
struct CoreEntry : public DoublyLinkedListLinkImpl<CoreEntry> {
HeapLink<CoreEntry, int32> fPriorityHeapLink;
MinMaxHeapLink<CoreEntry, int32> fThreadHeapLink;
MinMaxHeapLink<CoreEntry, int> fLoadHeapLink;
int32 fCoreID;
@ -96,9 +98,6 @@ struct CoreEntry : public DoublyLinkedListLinkImpl<CoreEntry> {
bigtime_t fActiveTime;
int32 fCPUBoundThreads;
int32 fThreads;
int fLoad;
};
@ -108,14 +107,11 @@ typedef Heap<CoreEntry, int32, HeapLesserCompare<int32>,
CorePriorityHeap;
static CorePriorityHeap* sCorePriorityHeap;
typedef MinMaxHeap<CoreEntry, int32, MinMaxHeapCompare<int32>,
MinMaxHeapMemberGetLink<CoreEntry, int32, &CoreEntry::fThreadHeapLink> >
CoreThreadHeap;
static CoreThreadHeap* sCoreThreadHeap;
static CoreThreadHeap* sCoreCPUBoundThreadHeap;
static int32 sCPUBoundThreads;
static int32 sAssignedThreads;
typedef MinMaxHeap<CoreEntry, int, MinMaxHeapCompare<int>,
MinMaxHeapMemberGetLink<CoreEntry, int, &CoreEntry::fLoadHeapLink> >
CoreLoadHeap;
static CoreLoadHeap* sCoreLoadHeap;
static CoreLoadHeap* sCoreHighLoadHeap;
// sPackageUsageHeap is used to decide which core should be woken up from the
// idle state. When aiming for performance we should use as many packages as
@ -201,7 +197,7 @@ scheduler_thread_data::Init()
static inline int
get_minimal_priority(Thread* thread)
{
return min_c(thread->priority, 25) / 5;
return max_c(min_c(thread->priority, 25) / 5, 1);
}
@ -315,17 +311,16 @@ dump_heap(CPUHeap* heap)
static void
dump_core_thread_heap(CoreThreadHeap* heap)
dump_core_load_heap(CoreLoadHeap* heap)
{
CoreThreadHeap temp(sRunQueueCount);
CoreLoadHeap temp(sRunQueueCount);
int32 cpuPerCore = smp_get_num_cpus() / sRunQueueCount;
CoreEntry* entry = heap->PeekMinimum();
while (entry) {
int32 key = CoreThreadHeap::GetKey(entry);
kprintf("%4" B_PRId32 " %6" B_PRId32 " %7" B_PRId32 " %9" B_PRId32
" %3d%%\n", entry->fCoreID, key, entry->fThreads,
entry->fCPUBoundThreads, entry->fLoad / cpuPerCore / 10);
int key = CoreLoadHeap::GetKey(entry);
kprintf("%4" B_PRId32 " %3" B_PRId32 "%%\n", entry->fCoreID,
entry->fLoad / cpuPerCore / 10);
heap->RemoveMinimum();
temp.Insert(entry, key);
@ -335,7 +330,7 @@ dump_core_thread_heap(CoreThreadHeap* heap)
entry = temp.PeekMinimum();
while (entry) {
int32 key = CoreThreadHeap::GetKey(entry);
int key = CoreLoadHeap::GetKey(entry);
temp.RemoveMinimum();
heap->Insert(entry, key);
entry = temp.PeekMinimum();
@ -373,9 +368,10 @@ dump_cpu_heap(int argc, char** argv)
entry = temp.PeekRoot();
}
kprintf("\ncore key threads cpu-bound load\n");
dump_core_thread_heap(sCoreThreadHeap);
dump_core_thread_heap(sCoreCPUBoundThreadHeap);
kprintf("\ncore load\n");
dump_core_load_heap(sCoreLoadHeap);
kprintf("---------\n");
dump_core_load_heap(sCoreHighLoadHeap);
for (int32 i = 0; i < sRunQueueCount; i++) {
kprintf("\nCore %" B_PRId32 " heap:\n", i);
@ -517,43 +513,40 @@ scheduler_dump_thread_data(Thread* thread)
static void
update_thread_heaps(int32 core)
update_load_heaps(int32 core)
{
ASSERT(!sSingleCore);
CoreEntry* entry = &sCoreEntries[core];
ASSERT(entry->fCPUBoundThreads >= 0
&& entry->fCPUBoundThreads <= entry->fThreads);
ASSERT(entry->fThreads >= 0
&& entry->fThreads <= thread_max_threads());
int32 cpuPerCore = smp_get_num_cpus() / sRunQueueCount;
int newKey = entry->fLoad / cpuPerCore;
int oldKey = CoreLoadHeap::GetKey(entry);
int32 newKey = entry->fCPUBoundThreads * thread_max_threads();
newKey += entry->fThreads;
int32 oldKey = CoreThreadHeap::GetKey(entry);
ASSERT(oldKey >= 0 && oldKey <= 1000);
ASSERT(newKey >= 0 && newKey <= 1000);
if (oldKey == newKey)
return;
if (newKey > thread_max_threads()) {
if (oldKey <= thread_max_threads()) {
sCoreThreadHeap->ModifyKey(entry, -1);
ASSERT(sCoreThreadHeap->PeekMinimum() == entry);
sCoreThreadHeap->RemoveMinimum();
if (newKey > kHighLoad) {
if (oldKey <= kHighLoad) {
sCoreLoadHeap->ModifyKey(entry, -1);
ASSERT(sCoreLoadHeap->PeekMinimum() == entry);
sCoreLoadHeap->RemoveMinimum();
sCoreCPUBoundThreadHeap->Insert(entry, newKey);
sCoreHighLoadHeap->Insert(entry, newKey);
} else
sCoreCPUBoundThreadHeap->ModifyKey(entry, newKey);
sCoreHighLoadHeap->ModifyKey(entry, newKey);
} else {
if (oldKey > thread_max_threads()) {
sCoreCPUBoundThreadHeap->ModifyKey(entry, -1);
ASSERT(sCoreCPUBoundThreadHeap->PeekMinimum() == entry);
sCoreCPUBoundThreadHeap->RemoveMinimum();
if (oldKey > kHighLoad) {
sCoreHighLoadHeap->ModifyKey(entry, -1);
ASSERT(sCoreHighLoadHeap->PeekMinimum() == entry);
sCoreHighLoadHeap->RemoveMinimum();
sCoreThreadHeap->Insert(entry, newKey);
sCoreLoadHeap->Insert(entry, newKey);
} else
sCoreThreadHeap->ModifyKey(entry, newKey);
sCoreLoadHeap->ModifyKey(entry, newKey);
}
}
@ -566,8 +559,8 @@ disable_small_task_packing(void)
ASSERT(sDisableSmallTaskPacking == 0);
ASSERT(sSmallTaskCore == sCPUToCore[smp_get_current_cpu()]);
ASSERT(sAssignedThreads > 0);
sDisableSmallTaskPacking = sAssignedThreads * 64;
// ASSERT(sAssignedThreads > 0);
// sDisableSmallTaskPacking = sAssignedThreads * 64;
sSmallTaskCore = -1;
}
@ -575,7 +568,7 @@ disable_small_task_packing(void)
static inline void
increase_penalty(Thread* thread)
{
if (thread->priority <= B_LOWEST_ACTIVE_PRIORITY)
if (thread->priority < B_LOWEST_ACTIVE_PRIORITY)
return;
if (thread->priority >= B_FIRST_REAL_TIME_PRIORITY)
return;
@ -589,23 +582,6 @@ increase_penalty(Thread* thread)
const int kMinimalPriority = get_minimal_priority(thread);
if (thread->priority - oldPenalty <= kMinimalPriority) {
int32 core = schedulerThreadData->previous_core;
ASSERT(core >= 0);
int32 additionalPenalty = schedulerThreadData->additional_penalty;
if (additionalPenalty == 0 && !sSingleCore) {
sCPUBoundThreads++;
sCoreEntries[core].fCPUBoundThreads++;
update_thread_heaps(core);
}
const int kSmallTaskThreshold = 50;
if (additionalPenalty > kSmallTaskThreshold && !sSingleCore) {
if (sSmallTaskCore == core)
disable_small_task_packing();
}
schedulerThreadData->priority_penalty = oldPenalty;
schedulerThreadData->additional_penalty++;
}
@ -727,9 +703,9 @@ choose_core_performance(Thread* thread)
int32 priority = get_effective_priority(thread);
if (CorePriorityHeap::GetKey(entry) >= priority) {
entry = sCoreThreadHeap->PeekMinimum();
entry = sCoreLoadHeap->PeekMinimum();
if (entry == NULL)
entry = sCoreCPUBoundThreadHeap->PeekMinimum();
entry = sCoreHighLoadHeap->PeekMinimum();
}
}
@ -758,10 +734,10 @@ choose_core_power_saving(Thread* thread)
sDisableSmallTaskPacking--;
if (!sDisableSmallTaskPacking && is_task_small(thread)
&& sCoreThreadHeap->PeekMaximum() != NULL) {
&& sCoreLoadHeap->PeekMaximum() != NULL) {
// try to pack all threads on one core
if (sSmallTaskCore < 0)
sSmallTaskCore = sCoreThreadHeap->PeekMaximum()->fCoreID;
sSmallTaskCore = sCoreLoadHeap->PeekMaximum()->fCoreID;
entry = &sCoreEntries[sSmallTaskCore];
} else if (sCorePriorityHeap->PeekRoot() != NULL
&& CorePriorityHeap::GetKey(sCorePriorityHeap->PeekRoot())
@ -778,9 +754,9 @@ choose_core_power_saving(Thread* thread)
entry = package->fIdleCores.Last();
} else {
// no idle cores, use least occupied core
entry = sCoreThreadHeap->PeekMinimum();
entry = sCoreLoadHeap->PeekMinimum();
if (entry == NULL)
entry = sCoreCPUBoundThreadHeap->PeekMinimum();
entry = sCoreHighLoadHeap->PeekMinimum();
}
ASSERT(entry != NULL);
@ -808,6 +784,7 @@ choose_cpu(int32 core)
static bool
should_rebalance(Thread* thread)
{
#if 0
ASSERT(!sSingleCore);
if (thread_is_idle_thread(thread))
@ -857,31 +834,11 @@ should_rebalance(Thread* thread)
// won't get much worse...
const int32 kBalanceThreshold = 3;
return threadsAboveAverage > kBalanceThreshold;
#endif
return false;
}
static void
assign_active_thread_to_core(Thread* thread)
{
if (thread_is_idle_thread(thread) || sSingleCore)
return;
scheduler_thread_data* schedulerThreadData = thread->scheduler_data;
ASSERT(schedulerThreadData->previous_core >= 0);
int32 core = schedulerThreadData->previous_core;
sCoreEntries[core].fThreads++;
sAssignedThreads++;
if (schedulerThreadData->additional_penalty != 0) {
sCoreEntries[core].fCPUBoundThreads++;
sCPUBoundThreads++;
}
update_thread_heaps(core);
}
static inline void
thread_goes_away(Thread* thread)
@ -896,24 +853,6 @@ thread_goes_away(Thread* thread)
schedulerThreadData->went_sleep = system_time();
schedulerThreadData->went_sleep_active = sCoreEntries[core].fActiveTime;
if (sSingleCore)
return;
ASSERT(sCoreEntries[core].fThreads > 0);
ASSERT(sCoreEntries[core].fThreads > sCoreEntries[core].fCPUBoundThreads
|| (sCoreEntries[core].fThreads == sCoreEntries[core].fCPUBoundThreads
&& schedulerThreadData->additional_penalty != 0));
sCoreEntries[core].fThreads--;
sAssignedThreads--;
if (schedulerThreadData->additional_penalty != 0) {
ASSERT(sCoreEntries[core].fCPUBoundThreads > 0);
sCoreEntries[core].fCPUBoundThreads--;
sCPUBoundThreads--;
}
update_thread_heaps(core);
}
@ -999,9 +938,6 @@ enqueue(Thread* thread, bool newOne)
targetCPU = thread->previous_cpu->cpu_num;
targetCore = sCPUToCore[targetCPU];
ASSERT(targetCore == schedulerThreadData->previous_core);
if (newOne)
assign_active_thread_to_core(thread);
} else if (sSingleCore) {
targetCore = 0;
targetCPU = choose_cpu(targetCore);
@ -1015,20 +951,14 @@ enqueue(Thread* thread, bool newOne)
targetCPU = thread->previous_cpu->cpu_num;
targetCore = sCPUToCore[targetCPU];
} else {
if (!newOne)
thread_goes_away(thread);
targetCore = choose_core(thread);
targetCPU = choose_cpu(targetCore);
}
schedulerThreadData->previous_core = targetCore;
assign_active_thread_to_core(thread);
} else {
targetCore = schedulerThreadData->previous_core;
targetCPU = choose_cpu(targetCore);
if (newOne)
assign_active_thread_to_core(thread);
}
ASSERT(targetCore >= 0 && targetCore < sRunQueueCount);
@ -1116,17 +1046,12 @@ scheduler_set_thread_priority(Thread *thread, int32 priority)
thread->id, priority, thread->priority,
get_effective_priority(thread));
if (thread->state == B_THREAD_RUNNING)
thread_goes_away(thread);
if (thread->state != B_THREAD_READY) {
cancel_penalty(thread);
thread->priority = priority;
if (thread->state == B_THREAD_RUNNING) {
assign_active_thread_to_core(thread);
if (thread->state == B_THREAD_RUNNING)
update_priority_heaps(thread->cpu->cpu_num, priority);
}
return;
}
@ -1143,7 +1068,6 @@ scheduler_set_thread_priority(Thread *thread, int32 priority)
int32 previousCore = thread->scheduler_data->previous_core;
ASSERT(previousCore >= 0);
sRunQueues[previousCore].Remove(thread);
thread_goes_away(thread);
// set priority and re-insert
cancel_penalty(thread);
@ -1276,11 +1200,11 @@ dequeue_thread(int32 thisCPU)
static inline void
compute_cpu_load(int32 cpu)
{
ASSERT(!sSingleCore);
const bigtime_t kLoadMeasureInterval = 50000;
const bigtime_t kIntervalInaccuracy = kLoadMeasureInterval / 4;
int32 thisCPU = smp_get_current_cpu();
bigtime_t now = system_time();
bigtime_t deltaTime = now - sCPUEntries[cpu].fMeasureTime;
@ -1288,26 +1212,34 @@ compute_cpu_load(int32 cpu)
return;
int oldLoad = sCPUEntries[cpu].fLoad;
ASSERT(oldLoad >= 0 && oldLoad <= 1000);
int load = sCPUEntries[cpu].fMeasureActiveTime * 1000;
load /= max_c(now - sCPUEntries[cpu].fMeasureTime, 1);
load = max_c(min_c(load, 1000), 0);
sCPUEntries[cpu].fMeasureActiveTime = 0;
sCPUEntries[cpu].fMeasureTime = now;
deltaTime += kIntervalInaccuracy;
int n = max_c(deltaTime / kLoadMeasureInterval, 1);
int n = deltaTime / kLoadMeasureInterval;
ASSERT(n > 0);
if (n > 10)
sCPUEntries[cpu].fLoad = load;
else {
load *= (1 << n) - 1;
sCPUEntries[cpu].fLoad = (sCPUEntries[cpu].fLoad + load) / (1 << n);
ASSERT(sCPUEntries[cpu].fLoad >= 0 && sCPUEntries[cpu].fLoad <= 1000);
}
if (oldLoad != load) {
int32 core = sCPUToCore[cpu];
sCoreEntries[core].fLoad -= oldLoad;
sCoreEntries[core].fLoad += sCPUEntries[cpu].fLoad;
update_load_heaps(core);
}
}
@ -1338,7 +1270,8 @@ track_cpu_activity(Thread* oldThread, Thread* nextThread, int32 thisCore)
sCoreEntries[thisCore].fActiveTime += active;
}
compute_cpu_load(smp_get_current_cpu());
if (!sSingleCore)
compute_cpu_load(smp_get_current_cpu());
int32 oldPriority = get_effective_priority(oldThread);
int32 nextPriority = get_effective_priority(nextThread);
@ -1410,7 +1343,6 @@ _scheduler_reschedule(void)
TRACE("reschedule(): suspending thread %ld\n", oldThread->id);
break;
case THREAD_STATE_FREE_ON_RESCHED:
thread_goes_away(oldThread);
break;
default:
increase_penalty(oldThread);
@ -1455,7 +1387,6 @@ _scheduler_reschedule(void)
nextThread->state = B_THREAD_RUNNING;
nextThread->next_state = B_THREAD_READY;
ASSERT(nextThread->scheduler_data->previous_core == thisCore);
//nextThread->scheduler_data->previous_core = thisCore;
// track kernel time (user time is tracked in thread_at_kernel_entry())
scheduler_update_thread_times(oldThread, nextThread);
@ -1685,25 +1616,22 @@ _scheduler_init()
return B_NO_MEMORY;
ObjectDeleter<CorePriorityHeap> corePriorityHeapDeleter(sCorePriorityHeap);
sCoreThreadHeap = new CoreThreadHeap;
if (sCoreThreadHeap == NULL)
sCoreLoadHeap = new CoreLoadHeap;
if (sCoreLoadHeap == NULL)
return B_NO_MEMORY;
ObjectDeleter<CoreThreadHeap> coreThreadHeapDeleter(sCoreThreadHeap);
ObjectDeleter<CoreLoadHeap> coreLoadHeapDeleter(sCoreLoadHeap);
sCoreCPUBoundThreadHeap = new CoreThreadHeap(coreCount);
if (sCoreCPUBoundThreadHeap == NULL)
sCoreHighLoadHeap = new CoreLoadHeap(coreCount);
if (sCoreHighLoadHeap == NULL)
return B_NO_MEMORY;
ObjectDeleter<CoreThreadHeap> coreCPUThreadHeapDeleter(
sCoreCPUBoundThreadHeap);
ObjectDeleter<CoreLoadHeap> coreHighLoadHeapDeleter(sCoreHighLoadHeap);
for (int32 i = 0; i < coreCount; i++) {
sCoreEntries[i].fCoreID = i;
sCoreEntries[i].fActiveTime = 0;
sCoreEntries[i].fThreads = 0;
sCoreEntries[i].fCPUBoundThreads = 0;
sCoreEntries[i].fLoad = 0;
status_t result = sCoreThreadHeap->Insert(&sCoreEntries[i], 0);
status_t result = sCoreLoadHeap->Insert(&sCoreEntries[i], 0);
if (result != B_OK)
return result;
@ -1765,7 +1693,7 @@ _scheduler_init()
return result;
}
#if 0
#if 1
scheduler_set_operation_mode(SCHEDULER_MODE_PERFORMANCE);
#else
scheduler_set_operation_mode(SCHEDULER_MODE_POWER_SAVING);
@ -1783,8 +1711,8 @@ _scheduler_init()
runQueuesDeleter.Detach();
pinnedRunQueuesDeleter.Detach();
coreCPUThreadHeapDeleter.Detach();
coreThreadHeapDeleter.Detach();
coreHighLoadHeapDeleter.Detach();
coreLoadHeapDeleter.Detach();
corePriorityHeapDeleter.Detach();
cpuPriorityHeapDeleter.Detach();
coreEntriesDeleter.Detach();