scheduler_affine: Balance number of threads assigned to CPUs

When the thread cannot be run immediately assign it to the core with
lowest number of CPU bound threads and assigned threads.
This commit is contained in:
Pawel Dziepak 2013-10-21 21:39:40 +02:00
parent 5cf9b69b49
commit 7aba623f52
1 changed files with 134 additions and 44 deletions

View File

@ -52,7 +52,7 @@ const bigtime_t kCacheExpire = 100000;
static scheduler_mode sSchedulerMode;
static int32 (*sChooseCore)(void);
static int32 (*sChooseCore)(int32 priority);
// Heaps in sCPUPriorityHeaps are used for load balancing on a core the logical
@ -67,16 +67,28 @@ typedef MinMaxHeap<CPUEntry, int32> AffineCPUHeap;
static CPUEntry* sCPUEntries;
static AffineCPUHeap* sCPUPriorityHeaps;
struct CoreEntry : public HeapLinkImpl<CoreEntry, int32>,
DoublyLinkedListLinkImpl<CoreEntry> {
struct CoreEntry : public DoublyLinkedListLinkImpl<CoreEntry> {
HeapLink<CoreEntry, int32> fPriorityHeapLink;
HeapLink<CoreEntry, int32> fThreadHeapLink;
int32 fCoreID;
bigtime_t fActiveTime;
int32 fCPUBoundThreads;
int32 fThreads;
};
static CoreEntry* sCoreEntries;
typedef Heap<CoreEntry, int32> AffineCoreHeap;
static AffineCoreHeap* sCorePriorityHeap;
typedef Heap<CoreEntry, int32, HeapLesserCompare<int32>,
HeapMemberGetLink<CoreEntry, int32, &CoreEntry::fPriorityHeapLink> >
AffineCorePriorityHeap;
static AffineCorePriorityHeap* sCorePriorityHeap;
typedef Heap<CoreEntry, int32, HeapLesserCompare<int32>,
HeapMemberGetLink<CoreEntry, int32, &CoreEntry::fThreadHeapLink> >
AffineCoreThreadHeap;
static AffineCoreThreadHeap* sCoreThreadHeap;
// sPackageUsageHeap is used to decide which core should be woken up from the
// idle state. When aiming for performance we should use as many packages as
@ -249,7 +261,7 @@ dump_run_queue(int argc, char **argv)
static void
dump_heap(AffineCPUHeap* heap)
{
AffineCPUHeap temp;
AffineCPUHeap temp(smp_get_num_cpus());
kprintf("cpu priority actual priority\n");
CPUEntry* entry = heap->PeekMinimum();
@ -278,13 +290,13 @@ dump_heap(AffineCPUHeap* heap)
static int
dump_cpu_heap(int argc, char** argv)
{
AffineCoreHeap temp;
AffineCorePriorityHeap temp(sRunQueueCount);
kprintf("core priority\n");
CoreEntry* entry = sCorePriorityHeap->PeekRoot();
while (entry) {
int32 core = entry->fCoreID;
int32 key = AffineCoreHeap::GetKey(entry);
int32 key = AffineCorePriorityHeap::GetKey(entry);
kprintf("%4" B_PRId32 " %8" B_PRId32 "\n", core, key);
sCorePriorityHeap->RemoveRoot();
@ -295,12 +307,35 @@ dump_cpu_heap(int argc, char** argv)
entry = temp.PeekRoot();
while (entry) {
int32 key = AffineCoreHeap::GetKey(entry);
int32 key = AffineCorePriorityHeap::GetKey(entry);
temp.RemoveRoot();
sCorePriorityHeap->Insert(entry, key);
entry = temp.PeekRoot();
}
AffineCoreThreadHeap temp2(sRunQueueCount);
kprintf("\ncore key threads cpu-bound\n");
entry = sCoreThreadHeap->PeekRoot();
while (entry) {
int32 key = AffineCoreThreadHeap::GetKey(entry);
kprintf("%4" B_PRId32 " %6" B_PRId32 " %7" B_PRId32 " %9" B_PRId32 "\n",
entry->fCoreID, key, entry->fThreads, entry->fCPUBoundThreads);
sCoreThreadHeap->RemoveRoot();
temp2.Insert(entry, key);
entry = sCoreThreadHeap->PeekRoot();
}
entry = temp2.PeekRoot();
while (entry) {
int32 key = AffineCoreThreadHeap::GetKey(entry);
temp2.RemoveRoot();
sCoreThreadHeap->Insert(entry, key);
entry = temp2.PeekRoot();
}
for (int32 i = 0; i < sRunQueueCount; i++) {
kprintf("\nCore %" B_PRId32 " heap:\n", i);
dump_heap(&sCPUPriorityHeaps[i]);
@ -339,8 +374,7 @@ dump_idle_cores(int argc, char** argv)
} else
kprintf("No idle packages.\n");
AffinePackageHeap temp;
temp.GrowHeap(smp_get_num_cpus());
AffinePackageHeap temp(smp_get_num_cpus());
kprintf("\nPackages with idle cores:\n");
PackageEntry* entry = sPackageUsageHeap->PeekMinimum();
@ -439,6 +473,22 @@ affine_dump_thread_data(Thread* thread)
}
static void
affine_update_thread_heaps(int32 core)
{
CoreEntry* entry = &sCoreEntries[core];
ASSERT(entry->fCPUBoundThreads >= 0
&& entry->fCPUBoundThreads <= entry->fThreads);
ASSERT(entry->fThreads >= 0
&& entry->fThreads <= thread_max_threads());
int32 newKey = entry->fCPUBoundThreads * thread_max_threads();
newKey += entry->fThreads;
sCoreThreadHeap->ModifyKey(entry, newKey);
}
static inline void
affine_increase_penalty(Thread* thread)
{
@ -455,6 +505,13 @@ affine_increase_penalty(Thread* thread)
ASSERT(thread->priority - oldPenalty >= B_LOWEST_ACTIVE_PRIORITY);
const int kMinimalPriority = affine_get_minimal_priority(thread);
if (thread->priority - oldPenalty <= kMinimalPriority) {
int32 core = schedulerThreadData->previous_core;
ASSERT(core >= 0);
if (schedulerThreadData->additional_penalty == 0) {
sCoreEntries[core].fCPUBoundThreads++;
affine_update_thread_heaps(core);
}
schedulerThreadData->priority_penalty = oldPenalty;
schedulerThreadData->additional_penalty++;
}
@ -468,31 +525,20 @@ affine_cancel_penalty(Thread* thread)
if (schedulerThreadData->priority_penalty != 0)
TRACE("cancelling thread %ld penalty\n", thread->id);
if (schedulerThreadData->additional_penalty != 0) {
int32 core = schedulerThreadData->previous_core;
ASSERT(core >= 0);
sCoreEntries[core].fCPUBoundThreads--;
affine_update_thread_heaps(core);
}
schedulerThreadData->priority_penalty = 0;
schedulerThreadData->additional_penalty = 0;
}
/*! Returns the most idle CPU based on the active time counters.
Note: thread lock must be held when entering this function
*/
#if 0
static int32
affine_get_most_idle_cpu()
{
int32 targetCPU = -1;
for (int32 i = 0; i < smp_get_num_cpus(); i++) {
if (gCPU[i].disabled)
continue;
if (targetCPU < 0 || sRunQueueSize[i] < sRunQueueSize[targetCPU])
targetCPU = i;
}
return targetCPU;
}
#endif
static inline void
affine_update_priority_heaps(int32 cpu, int32 priority)
{
@ -502,7 +548,7 @@ affine_update_priority_heaps(int32 cpu, int32 priority)
int32 maxPriority
= AffineCPUHeap::GetKey(sCPUPriorityHeaps[core].PeekMaximum());
int32 corePriority = AffineCoreHeap::GetKey(&sCoreEntries[core]);
int32 corePriority = AffineCorePriorityHeap::GetKey(&sCoreEntries[core]);
if (corePriority != maxPriority) {
sCorePriorityHeap->ModifyKey(&sCoreEntries[core], maxPriority);
@ -567,7 +613,7 @@ affine_update_priority_heaps(int32 cpu, int32 priority)
static int32
affine_choose_core_performance(void)
affine_choose_core_performance(int32 priority)
{
CoreEntry* entry;
@ -582,6 +628,8 @@ affine_choose_core_performance(void)
} else {
// no idle cores, use least occupied core
entry = sCorePriorityHeap->PeekRoot();
if (AffineCorePriorityHeap::GetKey(entry) >= priority)
entry = sCoreThreadHeap->PeekRoot();
}
ASSERT(entry != NULL);
@ -590,7 +638,7 @@ affine_choose_core_performance(void)
static int32
affine_choose_core_power_saving(void)
affine_choose_core_power_saving(int32 priority)
{
CoreEntry* entry;
@ -606,6 +654,8 @@ affine_choose_core_power_saving(void)
} else {
// no idle cores, use least occupied core
entry = sCorePriorityHeap->PeekRoot();
if (AffineCorePriorityHeap::GetKey(entry) >= priority)
entry = sCoreThreadHeap->PeekRoot();
}
ASSERT(entry != NULL);
@ -614,9 +664,9 @@ affine_choose_core_power_saving(void)
static inline int32
affine_choose_core(void)
affine_choose_core(int32 priority)
{
return sChooseCore();
return sChooseCore(priority);
}
@ -629,6 +679,33 @@ affine_choose_cpu(int32 core)
}
static void
affine_assign_thread_to_core(Thread* thread, int32 targetCore)
{
scheduler_thread_data* schedulerThreadData = thread->scheduler_data;
int32 oldCore = schedulerThreadData->previous_core;
if (oldCore == targetCore)
return;
if (oldCore >= 0) {
sCoreEntries[oldCore].fThreads--;
if (schedulerThreadData->additional_penalty != 0)
sCoreEntries[oldCore].fCPUBoundThreads--;
affine_update_thread_heaps(oldCore);
}
schedulerThreadData->previous_core = targetCore;
if (targetCore >= 0) {
sCoreEntries[targetCore].fThreads++;
if (schedulerThreadData->additional_penalty != 0)
sCoreEntries[targetCore].fCPUBoundThreads++;
affine_update_thread_heaps(targetCore);
}
}
static void
affine_enqueue(Thread* thread, bool newOne)
{
@ -658,10 +735,11 @@ affine_enqueue(Thread* thread, bool newOne)
targetCPU = thread->previous_cpu->cpu_num;
targetCore = sCPUToCore[targetCPU];
} else {
targetCore = affine_choose_core();
targetCore = affine_choose_core(threadPriority);
targetCPU = affine_choose_cpu(targetCore);
}
schedulerThreadData->previous_core = targetCore;
affine_assign_thread_to_core(thread, targetCore);
} else {
targetCore = schedulerThreadData->previous_core;
targetCPU = affine_choose_cpu(targetCore);
@ -1068,6 +1146,7 @@ affine_reschedule(void)
TRACE("reschedule(): suspending thread %ld\n", oldThread->id);
break;
case THREAD_STATE_FREE_ON_RESCHED:
affine_assign_thread_to_core(oldThread, -1);
break;
default:
affine_thread_goes_sleep(oldThread, thisCore);
@ -1302,13 +1381,10 @@ scheduler_affine_init()
return B_NO_MEMORY;
ArrayDeleter<PackageEntry> packageEntriesDeleter(sPackageEntries);
sPackageUsageHeap = new(std::nothrow) AffinePackageHeap;
sPackageUsageHeap = new(std::nothrow) AffinePackageHeap(packageCount);
if (sPackageUsageHeap == NULL)
return B_NO_MEMORY;
ObjectDeleter<AffinePackageHeap> packageHeapDeleter(sPackageUsageHeap);
result = sPackageUsageHeap->GrowHeap(packageCount);
if (result != B_OK)
return B_OK;
sIdlePackageList = new(std::nothrow) AffineIdlePackageList;
if (sIdlePackageList == NULL)
@ -1334,18 +1410,31 @@ scheduler_affine_init()
ArrayDeleter<CoreEntry> coreEntriesDeleter(
sCoreEntries);
sCorePriorityHeap = new AffineCoreHeap;
sCorePriorityHeap = new AffineCorePriorityHeap;
if (sCorePriorityHeap == NULL)
return B_NO_MEMORY;
ObjectDeleter<AffineCoreHeap> corePriorityHeapDeleter(sCorePriorityHeap);
ObjectDeleter<AffineCorePriorityHeap> corePriorityHeapDeleter(
sCorePriorityHeap);
sCoreThreadHeap = new AffineCoreThreadHeap;
if (sCoreThreadHeap == NULL)
return B_NO_MEMORY;
ObjectDeleter<AffineCoreThreadHeap> coreThreadHeapDeleter(sCoreThreadHeap);
for (int32 i = 0; i < coreCount; i++) {
sCoreEntries[i].fCoreID = i;
sCoreEntries[i].fActiveTime = 0;
sCoreEntries[i].fThreads = 0;
sCoreEntries[i].fCPUBoundThreads = 0;
status_t result = sCorePriorityHeap->Insert(&sCoreEntries[i],
B_IDLE_PRIORITY);
if (result != B_OK)
return result;
result = sCoreThreadHeap->Insert(&sCoreEntries[i], 0);
if (result != B_OK)
return result;
}
sCPUPriorityHeaps = new AffineCPUHeap[coreCount];
@ -1408,6 +1497,7 @@ scheduler_affine_init()
runQueuesDeleter.Detach();
pinnedRunQueuesDeleter.Detach();
coreThreadHeapDeleter.Detach();
corePriorityHeapDeleter.Detach();
cpuPriorityHeapDeleter.Detach();
coreEntriesDeleter.Detach();