diff --git a/src/main/java/dan200/computercraft/core/computer/ComputerExecutor.java b/src/main/java/dan200/computercraft/core/computer/ComputerExecutor.java index bdae4028f..8a1fa44e6 100644 --- a/src/main/java/dan200/computercraft/core/computer/ComputerExecutor.java +++ b/src/main/java/dan200/computercraft/core/computer/ComputerExecutor.java @@ -103,6 +103,20 @@ final class ComputerExecutor */ volatile boolean onComputerQueue = false; + /** + * The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers. + * + * @see ComputerThread + */ + long virtualRuntime = 0; + + /** + * The last time at which we updated {@link #virtualRuntime}. + * + * @see ComputerThread + */ + long vRuntimeStart; + /** * The command that {@link #work()} should execute on the computer thread. * @@ -280,9 +294,7 @@ final class ComputerExecutor { synchronized( queueLock ) { - if( onComputerQueue ) return; - onComputerQueue = true; - ComputerThread.queue( this ); + if( !onComputerQueue ) ComputerThread.queue( this ); } } @@ -482,14 +494,16 @@ final class ComputerExecutor */ void beforeWork() { + vRuntimeStart = System.nanoTime(); timeout.startTimer(); } /** - * Called after executing {@link #work()}. Adds this back to the {@link ComputerThread} if we have more work, - * otherwise remove it. + * Called after executing {@link #work()}. + * + * @return If we have more work to do. */ - void afterWork() + boolean afterWork() { if( interruptedEvent ) { @@ -502,16 +516,12 @@ final class ComputerExecutor Tracking.addTaskTiming( getComputer(), timeout.nanoCurrent() ); + if( interruptedEvent ) return true; + synchronized( queueLock ) { - if( !interruptedEvent && eventQueue.isEmpty() && command == null ) - { - onComputerQueue = false; - } - else - { - ComputerThread.queue( this ); - } + if( eventQueue.isEmpty() && command == null ) return onComputerQueue = false; + return true; } } diff --git a/src/main/java/dan200/computercraft/core/computer/ComputerThread.java b/src/main/java/dan200/computercraft/core/computer/ComputerThread.java index 2ec62e33d..c813a3931 100644 --- a/src/main/java/dan200/computercraft/core/computer/ComputerThread.java +++ b/src/main/java/dan200/computercraft/core/computer/ComputerThread.java @@ -10,11 +10,13 @@ import dan200.computercraft.ComputerCraft; import dan200.computercraft.shared.util.ThreadUtils; import javax.annotation.Nonnull; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.TreeSet; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; import static dan200.computercraft.core.computer.TimeoutState.ABORT_TIMEOUT; import static dan200.computercraft.core.computer.TimeoutState.TIMEOUT; @@ -34,16 +36,32 @@ public class ComputerThread */ private static final int MONITOR_WAKEUP = 100; + /** + * The target latency between executing two tasks on a single machine. + * + * An average tick takes 50ms, and so we ideally need to have handled a couple of events within that window in order + * to have a perceived low latency. + */ + private static final long DEFAULT_LATENCY = TimeUnit.MILLISECONDS.toNanos( 50 ); + + /** + * The minimum value that {@link #DEFAULT_LATENCY} can have when scaled. + * + * From statistics gathered on SwitchCraft, almost all machines will execute under 15ms, 75% under 1.5ms, with the + * mean being about 3ms. Most computers shouldn't be too impacted with having such a short period to execute in. + */ + private static final long DEFAULT_MIN_PERIOD = TimeUnit.MILLISECONDS.toNanos( 5 ); + + /** + * The maximum number of tasks before we have to start scaling latency linearly. + */ + private static final long LATENCY_MAX_TASKS = DEFAULT_LATENCY / DEFAULT_MIN_PERIOD; + /** * Lock used for modifications to the array of current threads. */ private static final Object threadLock = new Object(); - /** - * Active executors to run - */ - private static final BlockingQueue computersActive = new LinkedBlockingQueue<>(); - /** * Whether the computer thread system is currently running */ @@ -59,6 +77,29 @@ public class ComputerThread */ private static TaskRunner[] runners; + private static long latency; + private static long minPeriod; + + private static final ReentrantLock computerLock = new ReentrantLock(); + + private static final Condition hasWork = computerLock.newCondition(); + + /** + * Active queues to execute + */ + private static final TreeSet computerQueue = new TreeSet<>( ( a, b ) -> { + if( a == b ) return 0; // Should never happen, but let's be consistent here + + long at = a.virtualRuntime, bt = b.virtualRuntime; + if( at == bt ) return Integer.compare( a.hashCode(), b.hashCode() ); + return Long.compare( at, bt ); + } ); + + /** + * The minimum {@link ComputerExecutor#virtualRuntime} time on the tree. + */ + private static long minimumVirtualRuntime = 0; + private static final ThreadFactory monitorFactory = ThreadUtils.factory( "Computer-Monitor" ); private static final ThreadFactory runnerFactory = ThreadUtils.factory( "Computer-Runner" ); @@ -76,6 +117,12 @@ public class ComputerThread { // TODO: Resize this + kill old runners and start new ones. runners = new TaskRunner[ComputerCraft.computer_threads]; + + // latency and minPeriod are scaled by 1 + floor(log2(threads)). We can afford to execute tasks for + // longer when executing on more than one thread. + long factor = 64 - Long.numberOfLeadingZeros( runners.length ); + latency = DEFAULT_LATENCY * factor; + minPeriod = DEFAULT_MIN_PERIOD * factor; } for( int i = 0; i < runners.length; i++ ) @@ -112,18 +159,125 @@ public class ComputerThread } } - computersActive.clear(); + computerQueue.clear(); } /** * Mark a computer as having work, enqueuing it on the thread. * - * @param computer The computer to execute work on. + * @param executor The computer to execute work on. */ - static void queue( @Nonnull ComputerExecutor computer ) + static void queue( @Nonnull ComputerExecutor executor ) { - if( !computer.onComputerQueue ) throw new IllegalStateException( "Computer must be on queue" ); - computersActive.add( computer ); + computerLock.lock(); + try + { + if( executor.onComputerQueue ) throw new IllegalStateException( "Cannot queue already queued executor" ); + executor.onComputerQueue = true; + + updateRuntimes(); + + // We're not currently on the queue, so update its current execution time to + // ensure its at least as high as the minimum. + long newRuntime = minimumVirtualRuntime; + + if( executor.virtualRuntime == 0 ) + { + // Slow down new computers a little bit. + newRuntime += scaledPeriod(); + } + else + { + // Give a small boost to computers which have slept a little. + newRuntime -= latency / 2; + } + + executor.virtualRuntime = Math.max( newRuntime, executor.virtualRuntime ); + + // Add to the queue, and signal the workers. + computerQueue.add( executor ); + hasWork.signal(); + } + finally + { + computerLock.unlock(); + } + } + + + /** + * Update the {@link ComputerExecutor#virtualRuntime}s of all running tasks, and then increment the + * {@link #minimumVirtualRuntime} of the executor. + */ + private static void updateRuntimes() + { + long minRuntime = Long.MAX_VALUE; + + // If we've a task on the queue, use that as our base time. + if( !computerQueue.isEmpty() ) minRuntime = computerQueue.first().virtualRuntime; + + // Update all the currently executing tasks + TaskRunner[] currentRunners = runners; + if( currentRunners != null ) + { + long now = System.nanoTime(); + int tasks = 1 + computerQueue.size(); + for( TaskRunner runner : currentRunners ) + { + if( runner == null ) continue; + ComputerExecutor executor = runner.currentExecutor.get(); + if( executor == null ) continue; + + // We do two things here: first we update the task's virtual runtime based on when we + // last checked, and then we check the minimum. + minRuntime = Math.min( minRuntime, executor.virtualRuntime += (now - executor.vRuntimeStart) / tasks ); + executor.vRuntimeStart = now; + } + } + + if( minRuntime > minimumVirtualRuntime && minRuntime < Long.MAX_VALUE ) + { + minimumVirtualRuntime = minRuntime; + } + } + + /** + * Re-add this task to the queue + * + * @param executor The executor to requeue + */ + private static void afterWork( ComputerExecutor executor ) + { + computerLock.lock(); + try + { + updateRuntimes(); + + // Add to the queue, and signal the workers. + if( !executor.afterWork() ) return; + + computerQueue.add( executor ); + hasWork.signal(); + } + finally + { + computerLock.unlock(); + } + } + + /** + * The scaled period for a single task + * + * @return The scaled period for the task + * @see #DEFAULT_LATENCY + * @see #DEFAULT_MIN_PERIOD + * @see #LATENCY_MAX_TASKS + */ + static long scaledPeriod() + { + // +1 to include the current task + int count = 1 + computerQueue.size(); + return count < LATENCY_MAX_TASKS ? latency / count : minPeriod; } /** @@ -133,7 +287,7 @@ public class ComputerThread */ static boolean hasPendingWork() { - return computersActive.size() > 0; + return computerQueue.size() > 0; } /** @@ -184,7 +338,7 @@ public class ComputerThread runner.running = false; ComputerExecutor thisExecutor = runner.currentExecutor.getAndSet( null ); - if( thisExecutor != null ) executor.afterWork(); + if( thisExecutor != null ) afterWork( executor ); synchronized( threadLock ) { @@ -213,7 +367,7 @@ public class ComputerThread } /** - * Pulls tasks from the {@link #computersActive} queue and runs them. + * Pulls tasks from the {@link #computerQueue} queue and runs them. * * This is responsible for running the {@link ComputerExecutor#work()}, {@link ComputerExecutor#beforeWork()} and * {@link ComputerExecutor#afterWork()} functions. Everything else is either handled by the executor, timeout @@ -237,7 +391,17 @@ public class ComputerThread ComputerExecutor executor; try { - executor = computersActive.take(); + computerLock.lockInterruptibly(); + try + { + while( computerQueue.isEmpty() ) hasWork.await(); + executor = computerQueue.pollFirst(); + assert executor != null : "hasWork should ensure we never receive null work"; + } + finally + { + computerLock.unlock(); + } } catch( InterruptedException ignored ) { @@ -262,7 +426,7 @@ public class ComputerThread finally { ComputerExecutor thisExecutor = currentExecutor.getAndSet( null ); - if( thisExecutor != null ) executor.afterWork(); + if( thisExecutor != null ) afterWork( executor ); } } } diff --git a/src/main/java/dan200/computercraft/core/computer/TimeoutState.java b/src/main/java/dan200/computercraft/core/computer/TimeoutState.java index 6b21d2d09..002fd756c 100644 --- a/src/main/java/dan200/computercraft/core/computer/TimeoutState.java +++ b/src/main/java/dan200/computercraft/core/computer/TimeoutState.java @@ -27,11 +27,6 @@ import java.util.concurrent.TimeUnit; */ public final class TimeoutState { - /** - * The time to run a task before pausing in nanoseconds - */ - private static final long TIMESLICE = TimeUnit.MILLISECONDS.toNanos( 40 ); - /** * The total time a task is allowed to run before aborting in nanoseconds */ @@ -53,6 +48,7 @@ public final class TimeoutState private long nanoCumulative; private long nanoCurrent; + private long nanoDeadline; long nanoCumulative() { @@ -70,7 +66,7 @@ public final class TimeoutState public void refresh() { long now = System.nanoTime(); - if( !paused ) paused = (now - nanoCurrent) >= TIMESLICE; + if( !paused ) paused = now >= nanoDeadline && ComputerThread.hasPendingWork(); if( !softAbort ) softAbort = (now - nanoCumulative) >= TIMEOUT; } @@ -84,7 +80,7 @@ public final class TimeoutState */ public boolean isPaused() { - return paused && ComputerThread.hasPendingWork(); + return paused; } /** @@ -118,6 +114,7 @@ public final class TimeoutState { long now = System.nanoTime(); nanoCurrent = now; + nanoDeadline = now + ComputerThread.scaledPeriod(); // Compute the "nominal start time". nanoCumulative = now - nanoCumulative; }