mirror of
https://github.com/SquidDev-CC/CC-Tweaked
synced 2024-12-12 11:10:29 +00:00
Implement a CFS based scheduler
- The computer queue is a priority queue sorted by "virtual runtime". - Virtual runtime is based on the time this task has executed, divided by the number of pending tasks. - We try to execute every task within a given period. Each computer is allocated a fair share of that period, depending how many tasks are in the queue. Once a computer has used more than that period, the computer is paused and the next one resumed.
This commit is contained in:
parent
b3e6a53868
commit
a125a19728
@ -103,6 +103,20 @@ final class ComputerExecutor
|
|||||||
*/
|
*/
|
||||||
volatile boolean onComputerQueue = false;
|
volatile boolean onComputerQueue = false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers.
|
||||||
|
*
|
||||||
|
* @see ComputerThread
|
||||||
|
*/
|
||||||
|
long virtualRuntime = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The last time at which we updated {@link #virtualRuntime}.
|
||||||
|
*
|
||||||
|
* @see ComputerThread
|
||||||
|
*/
|
||||||
|
long vRuntimeStart;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The command that {@link #work()} should execute on the computer thread.
|
* The command that {@link #work()} should execute on the computer thread.
|
||||||
*
|
*
|
||||||
@ -280,9 +294,7 @@ final class ComputerExecutor
|
|||||||
{
|
{
|
||||||
synchronized( queueLock )
|
synchronized( queueLock )
|
||||||
{
|
{
|
||||||
if( onComputerQueue ) return;
|
if( !onComputerQueue ) ComputerThread.queue( this );
|
||||||
onComputerQueue = true;
|
|
||||||
ComputerThread.queue( this );
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -482,14 +494,16 @@ final class ComputerExecutor
|
|||||||
*/
|
*/
|
||||||
void beforeWork()
|
void beforeWork()
|
||||||
{
|
{
|
||||||
|
vRuntimeStart = System.nanoTime();
|
||||||
timeout.startTimer();
|
timeout.startTimer();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called after executing {@link #work()}. Adds this back to the {@link ComputerThread} if we have more work,
|
* Called after executing {@link #work()}.
|
||||||
* otherwise remove it.
|
*
|
||||||
|
* @return If we have more work to do.
|
||||||
*/
|
*/
|
||||||
void afterWork()
|
boolean afterWork()
|
||||||
{
|
{
|
||||||
if( interruptedEvent )
|
if( interruptedEvent )
|
||||||
{
|
{
|
||||||
@ -502,16 +516,12 @@ final class ComputerExecutor
|
|||||||
|
|
||||||
Tracking.addTaskTiming( getComputer(), timeout.nanoCurrent() );
|
Tracking.addTaskTiming( getComputer(), timeout.nanoCurrent() );
|
||||||
|
|
||||||
|
if( interruptedEvent ) return true;
|
||||||
|
|
||||||
synchronized( queueLock )
|
synchronized( queueLock )
|
||||||
{
|
{
|
||||||
if( !interruptedEvent && eventQueue.isEmpty() && command == null )
|
if( eventQueue.isEmpty() && command == null ) return onComputerQueue = false;
|
||||||
{
|
return true;
|
||||||
onComputerQueue = false;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ComputerThread.queue( this );
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -10,11 +10,13 @@ import dan200.computercraft.ComputerCraft;
|
|||||||
import dan200.computercraft.shared.util.ThreadUtils;
|
import dan200.computercraft.shared.util.ThreadUtils;
|
||||||
|
|
||||||
import javax.annotation.Nonnull;
|
import javax.annotation.Nonnull;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.LockSupport;
|
import java.util.concurrent.locks.LockSupport;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import static dan200.computercraft.core.computer.TimeoutState.ABORT_TIMEOUT;
|
import static dan200.computercraft.core.computer.TimeoutState.ABORT_TIMEOUT;
|
||||||
import static dan200.computercraft.core.computer.TimeoutState.TIMEOUT;
|
import static dan200.computercraft.core.computer.TimeoutState.TIMEOUT;
|
||||||
@ -34,16 +36,32 @@ public class ComputerThread
|
|||||||
*/
|
*/
|
||||||
private static final int MONITOR_WAKEUP = 100;
|
private static final int MONITOR_WAKEUP = 100;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The target latency between executing two tasks on a single machine.
|
||||||
|
*
|
||||||
|
* An average tick takes 50ms, and so we ideally need to have handled a couple of events within that window in order
|
||||||
|
* to have a perceived low latency.
|
||||||
|
*/
|
||||||
|
private static final long DEFAULT_LATENCY = TimeUnit.MILLISECONDS.toNanos( 50 );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The minimum value that {@link #DEFAULT_LATENCY} can have when scaled.
|
||||||
|
*
|
||||||
|
* From statistics gathered on SwitchCraft, almost all machines will execute under 15ms, 75% under 1.5ms, with the
|
||||||
|
* mean being about 3ms. Most computers shouldn't be too impacted with having such a short period to execute in.
|
||||||
|
*/
|
||||||
|
private static final long DEFAULT_MIN_PERIOD = TimeUnit.MILLISECONDS.toNanos( 5 );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of tasks before we have to start scaling latency linearly.
|
||||||
|
*/
|
||||||
|
private static final long LATENCY_MAX_TASKS = DEFAULT_LATENCY / DEFAULT_MIN_PERIOD;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lock used for modifications to the array of current threads.
|
* Lock used for modifications to the array of current threads.
|
||||||
*/
|
*/
|
||||||
private static final Object threadLock = new Object();
|
private static final Object threadLock = new Object();
|
||||||
|
|
||||||
/**
|
|
||||||
* Active executors to run
|
|
||||||
*/
|
|
||||||
private static final BlockingQueue<ComputerExecutor> computersActive = new LinkedBlockingQueue<>();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether the computer thread system is currently running
|
* Whether the computer thread system is currently running
|
||||||
*/
|
*/
|
||||||
@ -59,6 +77,29 @@ public class ComputerThread
|
|||||||
*/
|
*/
|
||||||
private static TaskRunner[] runners;
|
private static TaskRunner[] runners;
|
||||||
|
|
||||||
|
private static long latency;
|
||||||
|
private static long minPeriod;
|
||||||
|
|
||||||
|
private static final ReentrantLock computerLock = new ReentrantLock();
|
||||||
|
|
||||||
|
private static final Condition hasWork = computerLock.newCondition();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Active queues to execute
|
||||||
|
*/
|
||||||
|
private static final TreeSet<ComputerExecutor> computerQueue = new TreeSet<>( ( a, b ) -> {
|
||||||
|
if( a == b ) return 0; // Should never happen, but let's be consistent here
|
||||||
|
|
||||||
|
long at = a.virtualRuntime, bt = b.virtualRuntime;
|
||||||
|
if( at == bt ) return Integer.compare( a.hashCode(), b.hashCode() );
|
||||||
|
return Long.compare( at, bt );
|
||||||
|
} );
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The minimum {@link ComputerExecutor#virtualRuntime} time on the tree.
|
||||||
|
*/
|
||||||
|
private static long minimumVirtualRuntime = 0;
|
||||||
|
|
||||||
private static final ThreadFactory monitorFactory = ThreadUtils.factory( "Computer-Monitor" );
|
private static final ThreadFactory monitorFactory = ThreadUtils.factory( "Computer-Monitor" );
|
||||||
private static final ThreadFactory runnerFactory = ThreadUtils.factory( "Computer-Runner" );
|
private static final ThreadFactory runnerFactory = ThreadUtils.factory( "Computer-Runner" );
|
||||||
|
|
||||||
@ -76,6 +117,12 @@ public class ComputerThread
|
|||||||
{
|
{
|
||||||
// TODO: Resize this + kill old runners and start new ones.
|
// TODO: Resize this + kill old runners and start new ones.
|
||||||
runners = new TaskRunner[ComputerCraft.computer_threads];
|
runners = new TaskRunner[ComputerCraft.computer_threads];
|
||||||
|
|
||||||
|
// latency and minPeriod are scaled by 1 + floor(log2(threads)). We can afford to execute tasks for
|
||||||
|
// longer when executing on more than one thread.
|
||||||
|
long factor = 64 - Long.numberOfLeadingZeros( runners.length );
|
||||||
|
latency = DEFAULT_LATENCY * factor;
|
||||||
|
minPeriod = DEFAULT_MIN_PERIOD * factor;
|
||||||
}
|
}
|
||||||
|
|
||||||
for( int i = 0; i < runners.length; i++ )
|
for( int i = 0; i < runners.length; i++ )
|
||||||
@ -112,18 +159,125 @@ public class ComputerThread
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
computersActive.clear();
|
computerQueue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mark a computer as having work, enqueuing it on the thread.
|
* Mark a computer as having work, enqueuing it on the thread.
|
||||||
*
|
*
|
||||||
* @param computer The computer to execute work on.
|
* @param executor The computer to execute work on.
|
||||||
*/
|
*/
|
||||||
static void queue( @Nonnull ComputerExecutor computer )
|
static void queue( @Nonnull ComputerExecutor executor )
|
||||||
{
|
{
|
||||||
if( !computer.onComputerQueue ) throw new IllegalStateException( "Computer must be on queue" );
|
computerLock.lock();
|
||||||
computersActive.add( computer );
|
try
|
||||||
|
{
|
||||||
|
if( executor.onComputerQueue ) throw new IllegalStateException( "Cannot queue already queued executor" );
|
||||||
|
executor.onComputerQueue = true;
|
||||||
|
|
||||||
|
updateRuntimes();
|
||||||
|
|
||||||
|
// We're not currently on the queue, so update its current execution time to
|
||||||
|
// ensure its at least as high as the minimum.
|
||||||
|
long newRuntime = minimumVirtualRuntime;
|
||||||
|
|
||||||
|
if( executor.virtualRuntime == 0 )
|
||||||
|
{
|
||||||
|
// Slow down new computers a little bit.
|
||||||
|
newRuntime += scaledPeriod();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Give a small boost to computers which have slept a little.
|
||||||
|
newRuntime -= latency / 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
executor.virtualRuntime = Math.max( newRuntime, executor.virtualRuntime );
|
||||||
|
|
||||||
|
// Add to the queue, and signal the workers.
|
||||||
|
computerQueue.add( executor );
|
||||||
|
hasWork.signal();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
computerLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the {@link ComputerExecutor#virtualRuntime}s of all running tasks, and then increment the
|
||||||
|
* {@link #minimumVirtualRuntime} of the executor.
|
||||||
|
*/
|
||||||
|
private static void updateRuntimes()
|
||||||
|
{
|
||||||
|
long minRuntime = Long.MAX_VALUE;
|
||||||
|
|
||||||
|
// If we've a task on the queue, use that as our base time.
|
||||||
|
if( !computerQueue.isEmpty() ) minRuntime = computerQueue.first().virtualRuntime;
|
||||||
|
|
||||||
|
// Update all the currently executing tasks
|
||||||
|
TaskRunner[] currentRunners = runners;
|
||||||
|
if( currentRunners != null )
|
||||||
|
{
|
||||||
|
long now = System.nanoTime();
|
||||||
|
int tasks = 1 + computerQueue.size();
|
||||||
|
for( TaskRunner runner : currentRunners )
|
||||||
|
{
|
||||||
|
if( runner == null ) continue;
|
||||||
|
ComputerExecutor executor = runner.currentExecutor.get();
|
||||||
|
if( executor == null ) continue;
|
||||||
|
|
||||||
|
// We do two things here: first we update the task's virtual runtime based on when we
|
||||||
|
// last checked, and then we check the minimum.
|
||||||
|
minRuntime = Math.min( minRuntime, executor.virtualRuntime += (now - executor.vRuntimeStart) / tasks );
|
||||||
|
executor.vRuntimeStart = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if( minRuntime > minimumVirtualRuntime && minRuntime < Long.MAX_VALUE )
|
||||||
|
{
|
||||||
|
minimumVirtualRuntime = minRuntime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Re-add this task to the queue
|
||||||
|
*
|
||||||
|
* @param executor The executor to requeue
|
||||||
|
*/
|
||||||
|
private static void afterWork( ComputerExecutor executor )
|
||||||
|
{
|
||||||
|
computerLock.lock();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
updateRuntimes();
|
||||||
|
|
||||||
|
// Add to the queue, and signal the workers.
|
||||||
|
if( !executor.afterWork() ) return;
|
||||||
|
|
||||||
|
computerQueue.add( executor );
|
||||||
|
hasWork.signal();
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
computerLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The scaled period for a single task
|
||||||
|
*
|
||||||
|
* @return The scaled period for the task
|
||||||
|
* @see #DEFAULT_LATENCY
|
||||||
|
* @see #DEFAULT_MIN_PERIOD
|
||||||
|
* @see #LATENCY_MAX_TASKS
|
||||||
|
*/
|
||||||
|
static long scaledPeriod()
|
||||||
|
{
|
||||||
|
// +1 to include the current task
|
||||||
|
int count = 1 + computerQueue.size();
|
||||||
|
return count < LATENCY_MAX_TASKS ? latency / count : minPeriod;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -133,7 +287,7 @@ public class ComputerThread
|
|||||||
*/
|
*/
|
||||||
static boolean hasPendingWork()
|
static boolean hasPendingWork()
|
||||||
{
|
{
|
||||||
return computersActive.size() > 0;
|
return computerQueue.size() > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -184,7 +338,7 @@ public class ComputerThread
|
|||||||
runner.running = false;
|
runner.running = false;
|
||||||
|
|
||||||
ComputerExecutor thisExecutor = runner.currentExecutor.getAndSet( null );
|
ComputerExecutor thisExecutor = runner.currentExecutor.getAndSet( null );
|
||||||
if( thisExecutor != null ) executor.afterWork();
|
if( thisExecutor != null ) afterWork( executor );
|
||||||
|
|
||||||
synchronized( threadLock )
|
synchronized( threadLock )
|
||||||
{
|
{
|
||||||
@ -213,7 +367,7 @@ public class ComputerThread
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pulls tasks from the {@link #computersActive} queue and runs them.
|
* Pulls tasks from the {@link #computerQueue} queue and runs them.
|
||||||
*
|
*
|
||||||
* This is responsible for running the {@link ComputerExecutor#work()}, {@link ComputerExecutor#beforeWork()} and
|
* This is responsible for running the {@link ComputerExecutor#work()}, {@link ComputerExecutor#beforeWork()} and
|
||||||
* {@link ComputerExecutor#afterWork()} functions. Everything else is either handled by the executor, timeout
|
* {@link ComputerExecutor#afterWork()} functions. Everything else is either handled by the executor, timeout
|
||||||
@ -237,7 +391,17 @@ public class ComputerThread
|
|||||||
ComputerExecutor executor;
|
ComputerExecutor executor;
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
executor = computersActive.take();
|
computerLock.lockInterruptibly();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
while( computerQueue.isEmpty() ) hasWork.await();
|
||||||
|
executor = computerQueue.pollFirst();
|
||||||
|
assert executor != null : "hasWork should ensure we never receive null work";
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
computerLock.unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch( InterruptedException ignored )
|
catch( InterruptedException ignored )
|
||||||
{
|
{
|
||||||
@ -262,7 +426,7 @@ public class ComputerThread
|
|||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
ComputerExecutor thisExecutor = currentExecutor.getAndSet( null );
|
ComputerExecutor thisExecutor = currentExecutor.getAndSet( null );
|
||||||
if( thisExecutor != null ) executor.afterWork();
|
if( thisExecutor != null ) afterWork( executor );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -27,11 +27,6 @@ import java.util.concurrent.TimeUnit;
|
|||||||
*/
|
*/
|
||||||
public final class TimeoutState
|
public final class TimeoutState
|
||||||
{
|
{
|
||||||
/**
|
|
||||||
* The time to run a task before pausing in nanoseconds
|
|
||||||
*/
|
|
||||||
private static final long TIMESLICE = TimeUnit.MILLISECONDS.toNanos( 40 );
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The total time a task is allowed to run before aborting in nanoseconds
|
* The total time a task is allowed to run before aborting in nanoseconds
|
||||||
*/
|
*/
|
||||||
@ -53,6 +48,7 @@ public final class TimeoutState
|
|||||||
|
|
||||||
private long nanoCumulative;
|
private long nanoCumulative;
|
||||||
private long nanoCurrent;
|
private long nanoCurrent;
|
||||||
|
private long nanoDeadline;
|
||||||
|
|
||||||
long nanoCumulative()
|
long nanoCumulative()
|
||||||
{
|
{
|
||||||
@ -70,7 +66,7 @@ public final class TimeoutState
|
|||||||
public void refresh()
|
public void refresh()
|
||||||
{
|
{
|
||||||
long now = System.nanoTime();
|
long now = System.nanoTime();
|
||||||
if( !paused ) paused = (now - nanoCurrent) >= TIMESLICE;
|
if( !paused ) paused = now >= nanoDeadline && ComputerThread.hasPendingWork();
|
||||||
if( !softAbort ) softAbort = (now - nanoCumulative) >= TIMEOUT;
|
if( !softAbort ) softAbort = (now - nanoCumulative) >= TIMEOUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,7 +80,7 @@ public final class TimeoutState
|
|||||||
*/
|
*/
|
||||||
public boolean isPaused()
|
public boolean isPaused()
|
||||||
{
|
{
|
||||||
return paused && ComputerThread.hasPendingWork();
|
return paused;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -118,6 +114,7 @@ public final class TimeoutState
|
|||||||
{
|
{
|
||||||
long now = System.nanoTime();
|
long now = System.nanoTime();
|
||||||
nanoCurrent = now;
|
nanoCurrent = now;
|
||||||
|
nanoDeadline = now + ComputerThread.scaledPeriod();
|
||||||
// Compute the "nominal start time".
|
// Compute the "nominal start time".
|
||||||
nanoCumulative = now - nanoCumulative;
|
nanoCumulative = now - nanoCumulative;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user