oskit/oskit-20020317/threads/sched_stride/sched_stride.c

644 lines
15 KiB
C
Executable File

/*
* Copyright (c) 2000, 2001 University of Utah and the Flux Group.
* All rights reserved.
*
* This file is part of the Flux OSKit. The OSKit is free software, also known
* as "open source;" you can redistribute it and/or modify it under the terms
* of the GNU General Public License (GPL), version 2, as published by the Free
* Software Foundation (FSF). To explore alternate licensing terms, contact
* the University of Utah at csl-dist@cs.utah.edu or +1-801-585-3271.
*
* The OSKit is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GPL for more details. You should have
* received a copy of the GPL along with the OSKit; see the file COPYING. If
* not, write to the FSF, 59 Temple Place #330, Boston, MA 02111-1307, USA.
*/
#ifdef PTHREAD_SCHED_STRIDE
#include <threads/pthread_internal.h>
#include "pthread_signal.h"
/*
* Stride Scheduler.
* http://www.research.digital.com/SRC/personal/caw/papers/stride-mit-tm528.ps
*/
/*
* Non weak symbol to reference so that the library gets dragged in.
*/
int __drag_in_stride_scheduler__;
/*
* The run queue is ordered by the pass value.
*/
static queue_head_t stride_runq;
static int stride_runq_count;
static int global_tickets;
static int global_stride;
static long long global_pass;
static long long lastupdate;
static int stride_debug = 0;
/*
* Stride constants.
*/
#define STRIDE1 (1 << 20)
#define SCALE 1000000
#define QUANTUM (PTHREAD_TICK * SCALE)
/*
* Shorthand
*/
#define TICKETS(p) (p->tickets)
#define STRIDE(p) (p->stride)
#define PASS(p) (p->pass)
#define REMAIN(p) (p->remain)
#define START(p) (p->start)
/* pthread_scheduler.c */
extern pthread_lock_t pthread_sched_lock;
#ifdef STRIDE_DISABLE
static int stride_disabled;
static long long global_pass_save;
#define FIXEDTICKETS 100
#endif
/*
* These are the internal routines.
*/
/*
* Determine if a pthread is on the runq. Use a separate field
* since using the flags would require locking the thread. Use the
* queue chain pointer instead, setting it to zero when a thread is
* removed from the queue.
*/
static inline int
stride_runq_onrunq(pthread_thread_t *pthread)
{
return (int) pthread->runq.next;
}
/*
* Add and remove threads from the runq. The runq lock should be locked,
* and interrupts disabled.
*/
/*
* Insert into the runq. Simple ordered queue. Might need to be better!
*/
static void
stride_runq_insert(pthread_thread_t *pthread)
{
pthread_thread_t *ptmp;
if (queue_empty(&stride_runq)) {
queue_enter(&stride_runq, pthread, pthread_thread_t *, runq);
goto done;
}
queue_iterate(&stride_runq, ptmp, pthread_thread_t *, runq) {
if (PASS(pthread) < PASS(ptmp)) {
queue_enter_before(&stride_runq, ptmp,
pthread, pthread_thread_t *, runq);
goto done;
}
}
queue_enter(&stride_runq, pthread, pthread_thread_t *, runq);
done:
stride_runq_count++;
}
/*
* Dequeue the highest priority thread, which is the first thread since
* the list ordered by PASS.
*/
static pthread_thread_t *
stride_runq_dequeue(void)
{
pthread_thread_t *pnext;
queue_remove_first(&stride_runq, pnext, pthread_thread_t *, runq);
pnext->runq.next = (queue_entry_t) 0;
stride_runq_count--;
START(pnext) = oskit_pthread_realtime();
return pnext;
}
/*
* Remove an arbitrary thread from the runq.
*/
static inline void
stride_runq_remove(pthread_thread_t *pthread)
{
queue_remove(&stride_runq, pthread, pthread_thread_t *, runq);
pthread->runq.next = (queue_entry_t) 0;
stride_runq_count--;
}
/*
* Update global pass based on elapsed real time.
*/
static void
global_pass_update(void)
{
long long elapsed;
#ifdef STRIDE_DISABLE
/* charge everyone for a full tick */
if (stride_disabled)
elapsed = PTHREAD_TICK;
else
#endif
elapsed = oskit_pthread_realtime() - lastupdate;
lastupdate += elapsed;
global_pass += ((long long) global_stride *
(elapsed * SCALE)) / QUANTUM;
assert(global_pass >= 0);
}
/*
* Update thread pass when a thread does not consume its entire quantum,
* but is put back on the runq. Say, cause it was preempted.
*/
static void
thread_pass_update(pthread_thread_t *pthread)
{
long long elapsed;
int stride = STRIDE(pthread);
elapsed = oskit_pthread_realtime();
elapsed -= START(pthread);
#ifdef STRIDE_DISABLE
if (stride_disabled)
stride = STRIDE1 / FIXEDTICKETS;
#endif
PASS(pthread) += (stride * (elapsed * SCALE)) / QUANTUM;
assert(PASS(pthread) >= 0);
}
/*
* Update global tickets and stride to reflect change in runq.
*/
static void
global_tickets_update(int delta)
{
#ifdef STRIDE_DISABLE
if (stride_disabled) {
if (delta >= 0)
delta = FIXEDTICKETS;
else
delta = -FIXEDTICKETS;
}
#endif
global_tickets += delta;
assert(global_tickets >= 0);
/*
* XXX: Is this the correct thing to do when the last thread
* leaves? The paper says nothing about it.
*/
if (global_tickets)
global_stride = STRIDE1 / global_tickets;
else
global_stride = STRIDE1;
}
/*
* Initialize the tickets for a new thread to some reasonable value.
*/
static void
client_init(pthread_thread_t *pthread, int tickets)
{
TICKETS(pthread) = tickets;
STRIDE(pthread) = STRIDE1 / tickets;
REMAIN(pthread) = STRIDE(pthread);
}
/*
* Join the party.
*/
static void
client_join(pthread_thread_t *pthread)
{
global_pass_update();
PASS(pthread) = global_pass + REMAIN(pthread);
global_tickets_update(TICKETS(pthread));
stride_runq_insert(pthread);
}
/*
* Leave the party
*/
static void
client_leave(pthread_thread_t *pthread)
{
global_pass_update();
#ifdef STRIDE_DISABLE
/* no credit for unused time */
if (stride_disabled)
REMAIN(pthread) = 0;
else
#endif
REMAIN(pthread) = PASS(pthread) - global_pass;
global_tickets_update(-TICKETS(pthread));
}
/*
* Change priority
*/
static void
client_modify(pthread_thread_t *pthread, int current, int tickets)
{
int remain, stride, queued;
if ((queued = stride_runq_onrunq(pthread))) {
stride_runq_remove(pthread);
client_leave(pthread);
}
else if (current)
global_tickets_update(-TICKETS(pthread));
stride = STRIDE1 / tickets;
remain = (REMAIN(pthread) * stride) / STRIDE(pthread);
TICKETS(pthread) = tickets;
STRIDE(pthread) = stride;
REMAIN(pthread) = remain;
if (queued)
client_join(pthread);
else if (current)
global_tickets_update(TICKETS(pthread));
}
/*
* Debug
*/
static void
stride_runq_debug(void)
{
pthread_thread_t *pthread;
printf(__FUNCTION__ ": GT %d GS %d GP %qd LU %qd\n",
global_tickets, global_stride, global_pass, lastupdate);
queue_iterate(&stride_runq, pthread, pthread_thread_t *, runq) {
printf("%p(%d) T %d S %d P %qd R %d S %qd\n",
pthread, (int) pthread->tid,
TICKETS(pthread), STRIDE(pthread), PASS(pthread),
REMAIN(pthread), START(pthread));
}
}
/*
* These are the "exported" routines that are required by pthread_scheduler.c
*/
void
stride_sched_init(void)
{
queue_init(&stride_runq);
}
/*
* Return true of this scheduler schedules the given policy
*/
int
stride_sched_schedules(int policy)
{
if (policy == SCHED_STRIDE)
return 1;
return 0;
}
/*
* Make a thread runnable. Called from the scheduler module. Global scheduler
* lock is locked already!
*/
int
stride_sched_setrunnable(pthread_thread_t *pthread)
{
int resched = 0;
if (stride_runq_onrunq(pthread))
panic(__FUNCTION__ ": Already on runQ: 0x%x(%d)",
(int) pthread, pthread->tid);
client_join(pthread);
#ifdef PTHREAD_SCHED_POSIX
/*
* If there is a POSIX thread running, preempt it so that the
* scheduler can pick the next task, if thats what it wants to
* do.
*/
if (SCHED_POLICY_POSIX(CURPTHREAD()->policy))
resched = PREEMPT_NEEDED = 1;
#endif
return resched;
}
/*
* Terminate our association with a thread, as would happen if the
* scheduler policy is changed to another scheduler type like realtime.
*
* Note, it probably makes no sense to change the policy from one type to
* another, but you never know ...
*/
void
stride_sched_disassociate(pthread_thread_t *pthread)
{
pthread_lock(&pthread_sched_lock);
if (stride_runq_onrunq(pthread)) {
stride_runq_remove(pthread);
client_leave(pthread);
}
pthread_unlock(&pthread_sched_lock);
}
/*
* Validate proposed scheduler state. Returns 0 if ok, an error otherwise.
*/
int
stride_sched_check_schedstate(const struct sched_param *param)
{
return (param->tickets > 0 && param->tickets <= STRIDE1) ? 0 : EINVAL;
}
/*
* Initialize scheduling parameters for a newly created thread. Note,
* we don't need any locking since there are no side effects.
*/
void
stride_sched_init_schedstate(pthread_thread_t *pthread,
const struct sched_param *param)
{
static int initdone;
if (pthread->policy != SCHED_STRIDE)
panic(__FUNCTION__ ": Bad policy specified");
if (param->tickets <= 0 || param->tickets > STRIDE1)
panic(__FUNCTION__ ": Bad ticket value specified");
assert_preemption_enabled();
disable_preemption();
pthread_lock(&pthread_sched_lock);
client_init(pthread, param->tickets);
/*
* XXX!!!
*/
if (! initdone) {
/*
* Must be main thread, which is already running!
*/
global_tickets_update(TICKETS(pthread));
global_pass_update();
PASS(pthread) = global_pass;
initdone = 1;
}
/*
* Set the thread priority. We do this so that we can make a
* comparison between realtime and nonrealtime threads. I'm
* not sure there are any good models for this. This works okay
* for keeping queues in proper priority order (say, a mutex
* waiters queue).
*
* We use priorities greater than PRIORITY_MAX so that there is
* no interference with POSIX threads, and so that a realtime
* thread always appears to have higher priority than any POSIX
* thread. This forces rescheduling when appropriate.
*/
pthread->priority = PRIORITY_MAX + 5;
pthread->base_priority = pthread->priority;
#ifdef PTHREAD_SCHED_POSIX
/*
* If there is a POSIX thread running, preempt it so that the
* scheduler can pick the next task, if thats what it wants to
* do.
*/
if (SCHED_POLICY_POSIX(CURPTHREAD()->policy))
PREEMPT_NEEDED = 1;
#endif
pthread_unlock(&pthread_sched_lock);
enable_preemption();
}
/*
* Change thread scheduler state. pthread is locked. interrupts are disabled.
*/
int
stride_sched_change_state(pthread_thread_t *pthread,
const struct sched_param *param)
{
int resched = 0;
if (pthread->policy != SCHED_STRIDE)
panic(__FUNCTION__ ": Bad policy specified");
if (param->tickets <= 0 || param->tickets > STRIDE1)
panic(__FUNCTION__ ": Bad ticket value specified");
pthread_lock(&pthread_sched_lock);
client_modify(pthread, CURPTHREAD() == pthread, param->tickets);
/*
* Set the thread priority. We do this so that we can make a
* comparison between realtime and nonrealtime threads. I'm
* not sure there are any good models for this. This works okay
* for keeping queues in proper priority order (say, a mutex
* waiters queue).
*
* We use priorities greater than PRIORITY_MAX so that there is
* no interference with POSIX threads, and so that a realtime
* thread always appears to have higher priority than any POSIX
* thread. This forces rescheduling when appropriate.
*/
pthread->priority = PRIORITY_MAX + 5;
pthread->base_priority = pthread->priority;
#ifdef PTHREAD_SCHED_POSIX
/*
* If there is a non realtime thread running, preempt it so that
* the scheduler can pick the EDF task, if thats what it wants
* to do.
*/
if (SCHED_POLICY_POSIX(CURPTHREAD()->policy))
resched = PREEMPT_NEEDED = 1;
#endif
pthread_unlock(&pthread_sched_lock);
return resched;
}
/*
* Dispatch a thread back to the runq. The global scheduler lock is locked.
*/
int
stride_sched_dispatch(resched_flags_t reason, pthread_thread_t *pthread)
{
int rerun = 0;
switch (reason) {
case RESCHED_USERYIELD:
case RESCHED_YIELD:
case RESCHED_PREEMPT:
/*
* Thread yields. Reschedule.
*/
assert(! stride_runq_onrunq(pthread));
thread_pass_update(pthread);
stride_runq_insert(pthread);
break;
case RESCHED_INTERNAL:
/*
* This will be the idle thread. It gets invoked explicitly
* by the scheduler, so its tickets/pass/stride/whatever
* do not matter.
*/
break;
default:
/*
* All other rescheduling modes are blocks, and thus ignored.
*/
if (pthread == IDLETHREAD)
panic("posix_sched_dispatch: Idlethread!\n");
thread_pass_update(pthread);
client_leave(pthread);
break;
}
return rerun;
}
/*
* Return the highest priority thread ready to run. Scheduler lock is locked.
*/
pthread_thread_t *
stride_sched_thread_next(void)
{
pthread_thread_t *pnext;
if (stride_debug)
stride_runq_debug();
if (queue_empty(&stride_runq))
return 0;
pnext = stride_runq_dequeue();
return pnext;
}
int
stride_sched_priority_bump(pthread_thread_t *pthread, int newprio)
{
panic(__FUNCTION__ ": Not supported!");
}
#ifdef STRIDE_DISABLE
void
stride_scheduler_enable(void)
{
int enabled;
pthread_thread_t *ptmp;
queue_head_t runq_copy;
if (!stride_disabled)
return;
enabled = save_disable_interrupts();
stride_runq_debug();
queue_init(&runq_copy);
ptmp = CURPTHREAD();
stride_disabled = 0;
global_tickets = 0;
global_tickets_update(TICKETS(ptmp));
PASS(ptmp) = global_pass_save +
(STRIDE(ptmp) * (PTHREAD_TICK * SCALE)) / QUANTUM;
global_pass = PASS(ptmp);
stride_runq_count = 0;
if (queue_empty(&stride_runq)) {
restore_interrupt_enable(enabled);
stride_runq_debug();
return;
}
while (! queue_empty(&stride_runq)) {
queue_remove_first(&stride_runq,
ptmp, pthread_thread_t *, runq);
queue_enter(&runq_copy, ptmp, pthread_thread_t *, runq);
}
while (! queue_empty(&runq_copy)) {
queue_remove_first(&runq_copy,
ptmp, pthread_thread_t *, runq);
PASS(ptmp) = global_pass_save +
(STRIDE(ptmp) * (PTHREAD_TICK * SCALE)) / QUANTUM;
global_tickets_update(TICKETS(ptmp));
stride_runq_insert(ptmp);
}
ptmp = (pthread_thread_t *) queue_first(&stride_runq);
global_pass = PASS(ptmp);
stride_runq_debug();
restore_interrupt_enable(enabled);
}
void
stride_scheduler_disable(void)
{
int enabled;
pthread_thread_t *ptmp;
enabled = save_disable_interrupts();
stride_runq_debug();
ptmp = CURPTHREAD();
global_pass_save = global_pass;
stride_disabled = 1;
global_tickets = 0;
global_tickets_update(FIXEDTICKETS);
PASS(ptmp) = global_pass +
(STRIDE(ptmp) * (PTHREAD_TICK * SCALE)) / QUANTUM;
queue_iterate(&stride_runq, ptmp, pthread_thread_t *, runq) {
PASS(ptmp) = global_pass +
(STRIDE(ptmp) * (PTHREAD_TICK * SCALE)) / QUANTUM;
global_tickets_update(FIXEDTICKETS);
}
stride_runq_debug();
restore_interrupt_enable(enabled);
}
#endif
#endif