From 7aba623f524cb190460597f5d54bd9e890363def Mon Sep 17 00:00:00 2001 From: Pawel Dziepak Date: Mon, 21 Oct 2013 21:39:40 +0200 Subject: [PATCH] scheduler_affine: Balance number of threads assigned to CPUs When the thread cannot be run immediately assign it to the core with lowest number of CPU bound threads and assigned threads. --- .../kernel/scheduler/scheduler_affine.cpp | 178 +++++++++++++----- 1 file changed, 134 insertions(+), 44 deletions(-) diff --git a/src/system/kernel/scheduler/scheduler_affine.cpp b/src/system/kernel/scheduler/scheduler_affine.cpp index 094dc64a28..18d3139fab 100644 --- a/src/system/kernel/scheduler/scheduler_affine.cpp +++ b/src/system/kernel/scheduler/scheduler_affine.cpp @@ -52,7 +52,7 @@ const bigtime_t kCacheExpire = 100000; static scheduler_mode sSchedulerMode; -static int32 (*sChooseCore)(void); +static int32 (*sChooseCore)(int32 priority); // Heaps in sCPUPriorityHeaps are used for load balancing on a core the logical @@ -67,16 +67,28 @@ typedef MinMaxHeap AffineCPUHeap; static CPUEntry* sCPUEntries; static AffineCPUHeap* sCPUPriorityHeaps; -struct CoreEntry : public HeapLinkImpl, - DoublyLinkedListLinkImpl { +struct CoreEntry : public DoublyLinkedListLinkImpl { + HeapLink fPriorityHeapLink; + HeapLink fThreadHeapLink; + int32 fCoreID; bigtime_t fActiveTime; + + int32 fCPUBoundThreads; + int32 fThreads; }; static CoreEntry* sCoreEntries; -typedef Heap AffineCoreHeap; -static AffineCoreHeap* sCorePriorityHeap; +typedef Heap, + HeapMemberGetLink > + AffineCorePriorityHeap; +static AffineCorePriorityHeap* sCorePriorityHeap; + +typedef Heap, + HeapMemberGetLink > + AffineCoreThreadHeap; +static AffineCoreThreadHeap* sCoreThreadHeap; // sPackageUsageHeap is used to decide which core should be woken up from the // idle state. When aiming for performance we should use as many packages as @@ -249,7 +261,7 @@ dump_run_queue(int argc, char **argv) static void dump_heap(AffineCPUHeap* heap) { - AffineCPUHeap temp; + AffineCPUHeap temp(smp_get_num_cpus()); kprintf("cpu priority actual priority\n"); CPUEntry* entry = heap->PeekMinimum(); @@ -278,13 +290,13 @@ dump_heap(AffineCPUHeap* heap) static int dump_cpu_heap(int argc, char** argv) { - AffineCoreHeap temp; + AffineCorePriorityHeap temp(sRunQueueCount); kprintf("core priority\n"); CoreEntry* entry = sCorePriorityHeap->PeekRoot(); while (entry) { int32 core = entry->fCoreID; - int32 key = AffineCoreHeap::GetKey(entry); + int32 key = AffineCorePriorityHeap::GetKey(entry); kprintf("%4" B_PRId32 " %8" B_PRId32 "\n", core, key); sCorePriorityHeap->RemoveRoot(); @@ -295,12 +307,35 @@ dump_cpu_heap(int argc, char** argv) entry = temp.PeekRoot(); while (entry) { - int32 key = AffineCoreHeap::GetKey(entry); + int32 key = AffineCorePriorityHeap::GetKey(entry); temp.RemoveRoot(); sCorePriorityHeap->Insert(entry, key); entry = temp.PeekRoot(); } + AffineCoreThreadHeap temp2(sRunQueueCount); + + kprintf("\ncore key threads cpu-bound\n"); + entry = sCoreThreadHeap->PeekRoot(); + while (entry) { + int32 key = AffineCoreThreadHeap::GetKey(entry); + kprintf("%4" B_PRId32 " %6" B_PRId32 " %7" B_PRId32 " %9" B_PRId32 "\n", + entry->fCoreID, key, entry->fThreads, entry->fCPUBoundThreads); + + sCoreThreadHeap->RemoveRoot(); + temp2.Insert(entry, key); + + entry = sCoreThreadHeap->PeekRoot(); + } + + entry = temp2.PeekRoot(); + while (entry) { + int32 key = AffineCoreThreadHeap::GetKey(entry); + temp2.RemoveRoot(); + sCoreThreadHeap->Insert(entry, key); + entry = temp2.PeekRoot(); + } + for (int32 i = 0; i < sRunQueueCount; i++) { kprintf("\nCore %" B_PRId32 " heap:\n", i); dump_heap(&sCPUPriorityHeaps[i]); @@ -339,8 +374,7 @@ dump_idle_cores(int argc, char** argv) } else kprintf("No idle packages.\n"); - AffinePackageHeap temp; - temp.GrowHeap(smp_get_num_cpus()); + AffinePackageHeap temp(smp_get_num_cpus()); kprintf("\nPackages with idle cores:\n"); PackageEntry* entry = sPackageUsageHeap->PeekMinimum(); @@ -439,6 +473,22 @@ affine_dump_thread_data(Thread* thread) } +static void +affine_update_thread_heaps(int32 core) +{ + CoreEntry* entry = &sCoreEntries[core]; + + ASSERT(entry->fCPUBoundThreads >= 0 + && entry->fCPUBoundThreads <= entry->fThreads); + ASSERT(entry->fThreads >= 0 + && entry->fThreads <= thread_max_threads()); + + int32 newKey = entry->fCPUBoundThreads * thread_max_threads(); + newKey += entry->fThreads; + sCoreThreadHeap->ModifyKey(entry, newKey); +} + + static inline void affine_increase_penalty(Thread* thread) { @@ -455,6 +505,13 @@ affine_increase_penalty(Thread* thread) ASSERT(thread->priority - oldPenalty >= B_LOWEST_ACTIVE_PRIORITY); const int kMinimalPriority = affine_get_minimal_priority(thread); if (thread->priority - oldPenalty <= kMinimalPriority) { + int32 core = schedulerThreadData->previous_core; + ASSERT(core >= 0); + if (schedulerThreadData->additional_penalty == 0) { + sCoreEntries[core].fCPUBoundThreads++; + affine_update_thread_heaps(core); + } + schedulerThreadData->priority_penalty = oldPenalty; schedulerThreadData->additional_penalty++; } @@ -468,31 +525,20 @@ affine_cancel_penalty(Thread* thread) if (schedulerThreadData->priority_penalty != 0) TRACE("cancelling thread %ld penalty\n", thread->id); + + if (schedulerThreadData->additional_penalty != 0) { + int32 core = schedulerThreadData->previous_core; + ASSERT(core >= 0); + + sCoreEntries[core].fCPUBoundThreads--; + affine_update_thread_heaps(core); + } + schedulerThreadData->priority_penalty = 0; schedulerThreadData->additional_penalty = 0; } -/*! Returns the most idle CPU based on the active time counters. - Note: thread lock must be held when entering this function -*/ -#if 0 -static int32 -affine_get_most_idle_cpu() -{ - int32 targetCPU = -1; - for (int32 i = 0; i < smp_get_num_cpus(); i++) { - if (gCPU[i].disabled) - continue; - if (targetCPU < 0 || sRunQueueSize[i] < sRunQueueSize[targetCPU]) - targetCPU = i; - } - - return targetCPU; -} -#endif - - static inline void affine_update_priority_heaps(int32 cpu, int32 priority) { @@ -502,7 +548,7 @@ affine_update_priority_heaps(int32 cpu, int32 priority) int32 maxPriority = AffineCPUHeap::GetKey(sCPUPriorityHeaps[core].PeekMaximum()); - int32 corePriority = AffineCoreHeap::GetKey(&sCoreEntries[core]); + int32 corePriority = AffineCorePriorityHeap::GetKey(&sCoreEntries[core]); if (corePriority != maxPriority) { sCorePriorityHeap->ModifyKey(&sCoreEntries[core], maxPriority); @@ -567,7 +613,7 @@ affine_update_priority_heaps(int32 cpu, int32 priority) static int32 -affine_choose_core_performance(void) +affine_choose_core_performance(int32 priority) { CoreEntry* entry; @@ -582,6 +628,8 @@ affine_choose_core_performance(void) } else { // no idle cores, use least occupied core entry = sCorePriorityHeap->PeekRoot(); + if (AffineCorePriorityHeap::GetKey(entry) >= priority) + entry = sCoreThreadHeap->PeekRoot(); } ASSERT(entry != NULL); @@ -590,7 +638,7 @@ affine_choose_core_performance(void) static int32 -affine_choose_core_power_saving(void) +affine_choose_core_power_saving(int32 priority) { CoreEntry* entry; @@ -606,6 +654,8 @@ affine_choose_core_power_saving(void) } else { // no idle cores, use least occupied core entry = sCorePriorityHeap->PeekRoot(); + if (AffineCorePriorityHeap::GetKey(entry) >= priority) + entry = sCoreThreadHeap->PeekRoot(); } ASSERT(entry != NULL); @@ -614,9 +664,9 @@ affine_choose_core_power_saving(void) static inline int32 -affine_choose_core(void) +affine_choose_core(int32 priority) { - return sChooseCore(); + return sChooseCore(priority); } @@ -629,6 +679,33 @@ affine_choose_cpu(int32 core) } +static void +affine_assign_thread_to_core(Thread* thread, int32 targetCore) +{ + scheduler_thread_data* schedulerThreadData = thread->scheduler_data; + + int32 oldCore = schedulerThreadData->previous_core; + if (oldCore == targetCore) + return; + + if (oldCore >= 0) { + sCoreEntries[oldCore].fThreads--; + if (schedulerThreadData->additional_penalty != 0) + sCoreEntries[oldCore].fCPUBoundThreads--; + + affine_update_thread_heaps(oldCore); + } + + schedulerThreadData->previous_core = targetCore; + if (targetCore >= 0) { + sCoreEntries[targetCore].fThreads++; + if (schedulerThreadData->additional_penalty != 0) + sCoreEntries[targetCore].fCPUBoundThreads++; + affine_update_thread_heaps(targetCore); + } +} + + static void affine_enqueue(Thread* thread, bool newOne) { @@ -658,10 +735,11 @@ affine_enqueue(Thread* thread, bool newOne) targetCPU = thread->previous_cpu->cpu_num; targetCore = sCPUToCore[targetCPU]; } else { - targetCore = affine_choose_core(); + targetCore = affine_choose_core(threadPriority); targetCPU = affine_choose_cpu(targetCore); } - schedulerThreadData->previous_core = targetCore; + + affine_assign_thread_to_core(thread, targetCore); } else { targetCore = schedulerThreadData->previous_core; targetCPU = affine_choose_cpu(targetCore); @@ -1068,6 +1146,7 @@ affine_reschedule(void) TRACE("reschedule(): suspending thread %ld\n", oldThread->id); break; case THREAD_STATE_FREE_ON_RESCHED: + affine_assign_thread_to_core(oldThread, -1); break; default: affine_thread_goes_sleep(oldThread, thisCore); @@ -1302,13 +1381,10 @@ scheduler_affine_init() return B_NO_MEMORY; ArrayDeleter packageEntriesDeleter(sPackageEntries); - sPackageUsageHeap = new(std::nothrow) AffinePackageHeap; + sPackageUsageHeap = new(std::nothrow) AffinePackageHeap(packageCount); if (sPackageUsageHeap == NULL) return B_NO_MEMORY; ObjectDeleter packageHeapDeleter(sPackageUsageHeap); - result = sPackageUsageHeap->GrowHeap(packageCount); - if (result != B_OK) - return B_OK; sIdlePackageList = new(std::nothrow) AffineIdlePackageList; if (sIdlePackageList == NULL) @@ -1334,18 +1410,31 @@ scheduler_affine_init() ArrayDeleter coreEntriesDeleter( sCoreEntries); - sCorePriorityHeap = new AffineCoreHeap; + sCorePriorityHeap = new AffineCorePriorityHeap; if (sCorePriorityHeap == NULL) return B_NO_MEMORY; - ObjectDeleter corePriorityHeapDeleter(sCorePriorityHeap); + ObjectDeleter corePriorityHeapDeleter( + sCorePriorityHeap); + + sCoreThreadHeap = new AffineCoreThreadHeap; + if (sCoreThreadHeap == NULL) + return B_NO_MEMORY; + ObjectDeleter coreThreadHeapDeleter(sCoreThreadHeap); for (int32 i = 0; i < coreCount; i++) { sCoreEntries[i].fCoreID = i; sCoreEntries[i].fActiveTime = 0; + sCoreEntries[i].fThreads = 0; + sCoreEntries[i].fCPUBoundThreads = 0; + status_t result = sCorePriorityHeap->Insert(&sCoreEntries[i], B_IDLE_PRIORITY); if (result != B_OK) return result; + + result = sCoreThreadHeap->Insert(&sCoreEntries[i], 0); + if (result != B_OK) + return result; } sCPUPriorityHeaps = new AffineCPUHeap[coreCount]; @@ -1408,6 +1497,7 @@ scheduler_affine_init() runQueuesDeleter.Detach(); pinnedRunQueuesDeleter.Detach(); + coreThreadHeapDeleter.Detach(); corePriorityHeapDeleter.Detach(); cpuPriorityHeapDeleter.Detach(); coreEntriesDeleter.Detach();