scheduler_affine: Store cores with CPU bound threads separately

This is preparation for small task packing. We want to have as many idle
cores as possible. To achieve that we put all threads on the most heavily
loaded core (so the other ones can become idle). However, we don't really
want to do that if there are CPU bound tasks and if any of the cores
becomes overloaded.
This commit is contained in:
Pawel Dziepak 2013-10-22 23:52:40 +02:00
parent f823aacf59
commit 8d471bc3d9

View File

@ -50,6 +50,8 @@ const bigtime_t kMaxThreadQuantum = 10000;
const bigtime_t kCacheExpire = 100000;
static bigtime_t sDisableSmallTaskPacking;
static scheduler_mode sSchedulerMode;
static int32 (*sChooseCore)(int32 priority);
@ -69,7 +71,7 @@ static AffineCPUHeap* sCPUPriorityHeaps;
struct CoreEntry : public DoublyLinkedListLinkImpl<CoreEntry> {
HeapLink<CoreEntry, int32> fPriorityHeapLink;
HeapLink<CoreEntry, int32> fThreadHeapLink;
MinMaxHeapLink<CoreEntry, int32> fThreadHeapLink;
int32 fCoreID;
@ -85,10 +87,12 @@ typedef Heap<CoreEntry, int32, HeapLesserCompare<int32>,
AffineCorePriorityHeap;
static AffineCorePriorityHeap* sCorePriorityHeap;
typedef Heap<CoreEntry, int32, HeapLesserCompare<int32>,
HeapMemberGetLink<CoreEntry, int32, &CoreEntry::fThreadHeapLink> >
typedef MinMaxHeap<CoreEntry, int32, MinMaxHeapCompare<int32>,
MinMaxHeapMemberGetLink<CoreEntry, int32, &CoreEntry::fThreadHeapLink> >
AffineCoreThreadHeap;
static AffineCoreThreadHeap* sCoreThreadHeap;
static AffineCoreThreadHeap* sCoreCPUBoundThreadHeap;
static int32 sCPUBoundThreads;
static int32 sAssignedThreads;
@ -289,13 +293,44 @@ dump_heap(AffineCPUHeap* heap)
}
static void
dump_core_thread_heap(AffineCoreThreadHeap* heap)
{
AffineCoreThreadHeap temp(sRunQueueCount);
CoreEntry* entry = heap->PeekMinimum();
while (entry) {
int32 key = AffineCoreThreadHeap::GetKey(entry);
kprintf("%4" B_PRId32 " %6" B_PRId32 " %7" B_PRId32 " %9" B_PRId32 "\n",
entry->fCoreID, key, entry->fThreads, entry->fCPUBoundThreads);
heap->RemoveMinimum();
temp.Insert(entry, key);
entry = heap->PeekMinimum();
}
entry = temp.PeekMinimum();
while (entry) {
int32 key = AffineCoreThreadHeap::GetKey(entry);
temp.RemoveMinimum();
heap->Insert(entry, key);
entry = temp.PeekMinimum();
}
}
static int
dump_cpu_heap(int argc, char** argv)
{
AffineCorePriorityHeap temp(sRunQueueCount);
kprintf("core priority\n");
CoreEntry* entry = sCorePriorityHeap->PeekRoot();
if (entry != NULL)
kprintf("core priority\n");
else
kprintf("No active cores.\n");
while (entry) {
int32 core = entry->fCoreID;
int32 key = AffineCorePriorityHeap::GetKey(entry);
@ -315,28 +350,9 @@ dump_cpu_heap(int argc, char** argv)
entry = temp.PeekRoot();
}
AffineCoreThreadHeap temp2(sRunQueueCount);
kprintf("\ncore key threads cpu-bound\n");
entry = sCoreThreadHeap->PeekRoot();
while (entry) {
int32 key = AffineCoreThreadHeap::GetKey(entry);
kprintf("%4" B_PRId32 " %6" B_PRId32 " %7" B_PRId32 " %9" B_PRId32 "\n",
entry->fCoreID, key, entry->fThreads, entry->fCPUBoundThreads);
sCoreThreadHeap->RemoveRoot();
temp2.Insert(entry, key);
entry = sCoreThreadHeap->PeekRoot();
}
entry = temp2.PeekRoot();
while (entry) {
int32 key = AffineCoreThreadHeap::GetKey(entry);
temp2.RemoveRoot();
sCoreThreadHeap->Insert(entry, key);
entry = temp2.PeekRoot();
}
dump_core_thread_heap(sCoreThreadHeap);
dump_core_thread_heap(sCoreCPUBoundThreadHeap);
for (int32 i = 0; i < sRunQueueCount; i++) {
kprintf("\nCore %" B_PRId32 " heap:\n", i);
@ -487,7 +503,32 @@ affine_update_thread_heaps(int32 core)
int32 newKey = entry->fCPUBoundThreads * thread_max_threads();
newKey += entry->fThreads;
sCoreThreadHeap->ModifyKey(entry, newKey);
int32 oldKey = AffineCoreThreadHeap::GetKey(entry);
if (oldKey == newKey)
return;
if (newKey > thread_max_threads()) {
if (oldKey <= thread_max_threads()) {
sCoreThreadHeap->ModifyKey(entry, -1);
ASSERT(sCoreThreadHeap->PeekMinimum() == entry);
sCoreThreadHeap->RemoveMinimum();
ASSERT(sCoreThreadHeap->PeekMinimum() != entry);
sCoreCPUBoundThreadHeap->Insert(entry, newKey);
} else
sCoreCPUBoundThreadHeap->ModifyKey(entry, newKey);
} else {
if (oldKey > thread_max_threads()) {
sCoreCPUBoundThreadHeap->ModifyKey(entry, -1);
ASSERT(sCoreCPUBoundThreadHeap->PeekMinimum() == entry);
sCoreCPUBoundThreadHeap->RemoveMinimum();
sCoreThreadHeap->Insert(entry, newKey);
} else
sCoreThreadHeap->ModifyKey(entry, newKey);
}
}
@ -546,7 +587,14 @@ affine_update_priority_heaps(int32 cpu, int32 priority)
int32 corePriority = AffineCorePriorityHeap::GetKey(&sCoreEntries[core]);
if (corePriority != maxPriority) {
sCorePriorityHeap->ModifyKey(&sCoreEntries[core], maxPriority);
if (maxPriority == B_IDLE_PRIORITY) {
sCorePriorityHeap->ModifyKey(&sCoreEntries[core], B_IDLE_PRIORITY);
ASSERT(sCorePriorityHeap->PeekRoot() == &sCoreEntries[core]);
sCorePriorityHeap->RemoveRoot();
} else if (corePriority == B_IDLE_PRIORITY)
sCorePriorityHeap->Insert(&sCoreEntries[core], maxPriority);
else
sCorePriorityHeap->ModifyKey(&sCoreEntries[core], maxPriority);
int32 package = sCPUToPackage[cpu];
PackageEntry* packageEntry = &sPackageEntries[package];
@ -623,8 +671,11 @@ affine_choose_core_performance(int32 priority)
} else {
// no idle cores, use least occupied core
entry = sCorePriorityHeap->PeekRoot();
if (AffineCorePriorityHeap::GetKey(entry) >= priority)
entry = sCoreThreadHeap->PeekRoot();
if (AffineCorePriorityHeap::GetKey(entry) >= priority) {
entry = sCoreThreadHeap->PeekMinimum();
if (entry == NULL)
entry = sCoreCPUBoundThreadHeap->PeekMinimum();
}
}
ASSERT(entry != NULL);
@ -637,8 +688,14 @@ affine_choose_core_power_saving(int32 priority)
{
CoreEntry* entry;
// TODO: small tasks packing
if (sPackageUsageHeap->PeekMinimum() != NULL) {
entry = sCorePriorityHeap->PeekRoot();
if (entry != NULL && AffineCorePriorityHeap::GetKey(entry) < priority) {
// run immediately on already woken core
} else if (sDisableSmallTaskPacking < system_time()
&& sCoreThreadHeap->PeekMaximum() != NULL) {
// try to pack all threads on one core
entry = sCoreThreadHeap->PeekMaximum();
} else if (sPackageUsageHeap->PeekMinimum() != NULL) {
// wake new core
PackageEntry* package = sPackageUsageHeap->PeekMinimum();
entry = package->fIdleCores.Last();
@ -648,9 +705,9 @@ affine_choose_core_power_saving(int32 priority)
entry = package->fIdleCores.Last();
} else {
// no idle cores, use least occupied core
entry = sCorePriorityHeap->PeekRoot();
if (AffineCorePriorityHeap::GetKey(entry) >= priority)
entry = sCoreThreadHeap->PeekRoot();
entry = sCoreThreadHeap->PeekMinimum();
if (entry == NULL)
entry = sCoreCPUBoundThreadHeap->PeekMinimum();
}
ASSERT(entry != NULL);
@ -698,9 +755,11 @@ affine_should_rebalance(Thread* thread)
// this one to someone less busy.
int32 averageThread = sAssignedThreads / sRunQueueCount;
if (coreEntry->fCPUBoundThreads > 0) {
CoreEntry* other = sCoreThreadHeap->PeekRoot();
if (AffineCoreThreadHeap::GetKey(other) <= averageThread)
CoreEntry* other = sCoreThreadHeap->PeekMinimum();
if (other != NULL
&& AffineCoreThreadHeap::GetKey(other) <= averageThread) {
return true;
}
}
// No cpu bound threads - the situation is quite good. Make sure it
@ -767,6 +826,8 @@ affine_thread_goes_away(Thread* thread)
static void
affine_enqueue(Thread* thread, bool newOne)
{
ASSERT(thread != NULL);
thread->state = thread->next_state = B_THREAD_READY;
scheduler_thread_data* schedulerThreadData = thread->scheduler_data;
@ -813,6 +874,9 @@ affine_enqueue(Thread* thread, bool newOne)
affine_assign_active_thread_to_core(thread);
}
ASSERT(targetCore >= 0 && targetCore < sRunQueueCount);
ASSERT(targetCPU >= 0 && targetCPU < smp_get_num_cpus());
TRACE("enqueueing thread %ld with priority %ld %ld\n", thread->id,
threadPriority, targetCore);
if (pinned)
@ -895,17 +959,17 @@ affine_set_thread_priority(Thread *thread, int32 priority)
thread->id, priority, thread->priority,
affine_get_effective_priority(thread));
if (thread->state == B_THREAD_RUNNING) {
if (thread->state == B_THREAD_RUNNING)
affine_thread_goes_away(thread);
affine_update_priority_heaps(thread->cpu->cpu_num, priority);
}
if (thread->state != B_THREAD_READY) {
affine_cancel_penalty(thread);
thread->priority = priority;
if (thread->state == B_THREAD_RUNNING)
if (thread->state == B_THREAD_RUNNING) {
affine_assign_active_thread_to_core(thread);
affine_update_priority_heaps(thread->cpu->cpu_num, priority);
}
return;
}
@ -1403,7 +1467,7 @@ scheduler_affine_init()
ArrayDeleter<CoreEntry> coreEntriesDeleter(
sCoreEntries);
sCorePriorityHeap = new AffineCorePriorityHeap;
sCorePriorityHeap = new AffineCorePriorityHeap(coreCount);
if (sCorePriorityHeap == NULL)
return B_NO_MEMORY;
ObjectDeleter<AffineCorePriorityHeap> corePriorityHeapDeleter(
@ -1414,20 +1478,26 @@ scheduler_affine_init()
return B_NO_MEMORY;
ObjectDeleter<AffineCoreThreadHeap> coreThreadHeapDeleter(sCoreThreadHeap);
sCoreCPUBoundThreadHeap = new AffineCoreThreadHeap(coreCount);
if (sCoreCPUBoundThreadHeap == NULL)
return B_NO_MEMORY;
ObjectDeleter<AffineCoreThreadHeap> coreCPUThreadHeapDeleter(
sCoreCPUBoundThreadHeap);
for (int32 i = 0; i < coreCount; i++) {
sCoreEntries[i].fCoreID = i;
sCoreEntries[i].fActiveTime = 0;
sCoreEntries[i].fThreads = 0;
sCoreEntries[i].fCPUBoundThreads = 0;
status_t result = sCorePriorityHeap->Insert(&sCoreEntries[i],
B_IDLE_PRIORITY);
status_t result = sCoreThreadHeap->Insert(&sCoreEntries[i], 0);
if (result != B_OK)
return result;
result = sCoreThreadHeap->Insert(&sCoreEntries[i], 0);
result = sCorePriorityHeap->Insert(&sCoreEntries[i], B_IDLE_PRIORITY);
if (result != B_OK)
return result;
sCorePriorityHeap->RemoveRoot();
}
sCPUPriorityHeaps = new AffineCPUHeap[coreCount];
@ -1477,7 +1547,11 @@ scheduler_affine_init()
return result;
}
#if 1
affine_set_operation_mode(SCHEDULER_MODE_POWER_SAVING);
#else
affine_set_operation_mode(SCHEDULER_MODE_PERFORMANCE);
#endif
gScheduler = &kAffineOps;
add_debugger_command_etc("run_queue", &dump_run_queue,
@ -1490,6 +1564,7 @@ scheduler_affine_init()
runQueuesDeleter.Detach();
pinnedRunQueuesDeleter.Detach();
coreCPUThreadHeapDeleter.Detach();
coreThreadHeapDeleter.Detach();
corePriorityHeapDeleter.Detach();
cpuPriorityHeapDeleter.Detach();