1
0
mirror of https://github.com/SquidDev-CC/CC-Tweaked synced 2025-11-04 23:53:01 +00:00

Manage ComputerThread's lifecycle in ComputerContext

This converts ComputerThread from a singleton into a proper object,
which is setup when starting a computer, and tore down when the
ComputerContext is closed.

While this is mostly for conceptual elegance, it does offer some
concrete benefits:
 - You can now adjust the thread count without restarting the whole
   game (just leaving and rentering the world). Though, alas, no effect
   on servers.
 - We can run multiple ComputerThreads in parallel, which makes it much
   easier to run tests in parallel. This allows us to remove our rather
   silly IsolatedRunner test helper.
This commit is contained in:
Jonathan Coates
2022-10-22 14:36:25 +01:00
parent e9cde9e1bf
commit 57cf6084e2
10 changed files with 216 additions and 255 deletions

View File

@@ -5,6 +5,7 @@
*/
package dan200.computercraft.core;
import dan200.computercraft.core.computer.ComputerThread;
import dan200.computercraft.core.computer.GlobalEnvironment;
import dan200.computercraft.core.computer.mainthread.MainThreadScheduler;
import dan200.computercraft.core.lua.CobaltLuaMachine;
@@ -16,12 +17,17 @@ import dan200.computercraft.core.lua.ILuaMachine;
public final class ComputerContext implements AutoCloseable
{
private final GlobalEnvironment globalEnvironment;
private final ComputerThread computerScheduler;
private final MainThreadScheduler mainThreadScheduler;
private final ILuaMachine.Factory factory;
public ComputerContext( GlobalEnvironment globalEnvironment, MainThreadScheduler mainThreadScheduler, ILuaMachine.Factory factory )
public ComputerContext(
GlobalEnvironment globalEnvironment, ComputerThread computerScheduler,
MainThreadScheduler mainThreadScheduler, ILuaMachine.Factory factory
)
{
this.globalEnvironment = globalEnvironment;
this.computerScheduler = computerScheduler;
this.mainThreadScheduler = mainThreadScheduler;
this.factory = factory;
}
@@ -30,11 +36,12 @@ public final class ComputerContext implements AutoCloseable
* Create a default {@link ComputerContext} with the given global environment.
*
* @param environment The current global environment.
* @param threads The number of threads to use for the {@link #computerScheduler()}
* @param mainThreadScheduler The main thread scheduler to use.
*/
public ComputerContext( GlobalEnvironment environment, MainThreadScheduler mainThreadScheduler )
public ComputerContext( GlobalEnvironment environment, int threads, MainThreadScheduler mainThreadScheduler )
{
this( environment, mainThreadScheduler, CobaltLuaMachine::new );
this( environment, new ComputerThread( threads ), mainThreadScheduler, CobaltLuaMachine::new );
}
/**
@@ -47,6 +54,17 @@ public final class ComputerContext implements AutoCloseable
return globalEnvironment;
}
/**
* The {@link ComputerThread} instance under which computers are run. This is closed when the context is closed, and
* so should be unique per-context.
*
* @return The current computer thread manager.
*/
public ComputerThread computerScheduler()
{
return computerScheduler;
}
/**
* The {@link MainThreadScheduler} instance used to run main-thread tasks.
*
@@ -73,5 +91,6 @@ public final class ComputerContext implements AutoCloseable
@Override
public void close()
{
computerScheduler().stop();
}
}

View File

@@ -61,7 +61,8 @@ final class ComputerExecutor
private final ComputerEnvironment computerEnvironment;
private final MetricsObserver metrics;
private final List<ILuaAPI> apis = new ArrayList<>();
final TimeoutState timeout = new TimeoutState();
private final ComputerThread scheduler;
final TimeoutState timeout;
private FileSystem fileSystem;
@@ -164,13 +165,15 @@ final class ComputerExecutor
ComputerExecutor( Computer computer, ComputerEnvironment computerEnvironment, ComputerContext context )
{
// Ensure the computer thread is running as required.
ComputerThread.start();
this.computer = computer;
this.computerEnvironment = computerEnvironment;
metrics = computerEnvironment.getMetrics();
luaFactory = context.luaFactory();
scheduler = context.computerScheduler();
timeout = new TimeoutState( scheduler );
// Ensure the computer thread is running as required.
scheduler.start();
Environment environment = computer.getEnvironment();
@@ -316,7 +319,7 @@ final class ComputerExecutor
{
synchronized( queueLock )
{
if( !onComputerQueue ) ComputerThread.queue( this );
if( !onComputerQueue ) scheduler.queue( this );
}
}

View File

@@ -8,8 +8,8 @@ package dan200.computercraft.core.computer;
import dan200.computercraft.ComputerCraft;
import dan200.computercraft.shared.util.ThreadUtils;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -19,28 +19,25 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import static dan200.computercraft.core.computer.TimeoutState.ABORT_TIMEOUT;
import static dan200.computercraft.core.computer.TimeoutState.TIMEOUT;
/**
* Responsible for running all tasks from a {@link Computer}.
*
* <p>
* This is split into two components: the {@link TaskRunner}s, which pull an executor from the queue and execute it, and
* a single {@link Monitor} which observes all runners and kills them if they have not been terminated by
* {@link TimeoutState#isSoftAborted()}.
*
* <p>
* Computers are executed using a priority system, with those who have spent less time executing having a higher
* priority than those hogging the thread. This, combined with {@link TimeoutState#isPaused()} means we can reduce the
* risk of badly behaved computers stalling execution for everyone else.
*
* <p>
* This is done using an implementation of Linux's Completely Fair Scheduler. When a computer executes, we compute what
* share of execution time it has used (time executed/number of tasks). We then pick the computer who has the least
* "virtual execution time" (aka {@link ComputerExecutor#virtualRuntime}).
*
* <p>
* When adding a computer to the queue, we make sure its "virtual runtime" is at least as big as the smallest runtime.
* This means that adding computers which have slept a lot do not then have massive priority over everyone else. See
* {@link #queue(ComputerExecutor)} for how this is implemented.
*
* <p>
* In reality, it's unlikely that more than a few computers are waiting to execute at once, so this will not have much
* effect unless you have a computer hogging execution time. However, it is pretty effective in those situations.
*
@@ -49,6 +46,9 @@ import static dan200.computercraft.core.computer.TimeoutState.TIMEOUT;
*/
public final class ComputerThread
{
private static final ThreadFactory monitorFactory = ThreadUtils.factory( "Computer-Monitor" );
private static final ThreadFactory runnerFactory = ThreadUtils.factory( "Computer-Runner" );
/**
* How often the computer thread monitor should run.
*
@@ -58,7 +58,7 @@ public final class ComputerThread
/**
* The target latency between executing two tasks on a single machine.
*
* <p>
* An average tick takes 50ms, and so we ideally need to have handled a couple of events within that window in order
* to have a perceived low latency.
*/
@@ -66,7 +66,7 @@ public final class ComputerThread
/**
* The minimum value that {@link #DEFAULT_LATENCY} can have when scaled.
*
* <p>
* From statistics gathered on SwitchCraft, almost all machines will execute under 15ms, 75% under 1.5ms, with the
* mean being about 3ms. Most computers shouldn't be too impacted with having such a short period to execute in.
*/
@@ -87,36 +87,36 @@ public final class ComputerThread
/**
* Lock used for modifications to the array of current threads.
*/
private static final Object threadLock = new Object();
private final Object threadLock = new Object();
/**
* Whether the computer thread system is currently running.
*/
private static volatile boolean running = false;
private volatile boolean running = false;
/**
* The current task manager.
*/
private static Thread monitor;
private @Nullable Thread monitor;
/**
* The array of current runners, and their owning threads.
*/
private static TaskRunner[] runners;
private final TaskRunner[] runners;
private static long latency;
private static long minPeriod;
private final long latency;
private final long minPeriod;
private static final ReentrantLock computerLock = new ReentrantLock();
private final ReentrantLock computerLock = new ReentrantLock();
private static final Condition hasWork = computerLock.newCondition();
private static final AtomicInteger idleWorkers = new AtomicInteger( 0 );
private static final Condition monitorWakeup = computerLock.newCondition();
private final Condition hasWork = computerLock.newCondition();
private final AtomicInteger idleWorkers = new AtomicInteger( 0 );
private final Condition monitorWakeup = computerLock.newCondition();
/**
* Active queues to execute.
*/
private static final TreeSet<ComputerExecutor> computerQueue = new TreeSet<>( ( a, b ) -> {
private final TreeSet<ComputerExecutor> computerQueue = new TreeSet<>( ( a, b ) -> {
if( a == b ) return 0; // Should never happen, but let's be consistent here
long at = a.virtualRuntime, bt = b.virtualRuntime;
@@ -127,34 +127,27 @@ public final class ComputerThread
/**
* The minimum {@link ComputerExecutor#virtualRuntime} time on the tree.
*/
private static long minimumVirtualRuntime = 0;
private long minimumVirtualRuntime = 0;
private static final ThreadFactory monitorFactory = ThreadUtils.factory( "Computer-Monitor" );
private static final ThreadFactory runnerFactory = ThreadUtils.factory( "Computer-Runner" );
public ComputerThread( int threadCount )
{
runners = new TaskRunner[threadCount];
private ComputerThread() {}
// latency and minPeriod are scaled by 1 + floor(log2(threads)). We can afford to execute tasks for
// longer when executing on more than one thread.
int factor = 64 - Long.numberOfLeadingZeros( runners.length );
latency = DEFAULT_LATENCY * factor;
minPeriod = DEFAULT_MIN_PERIOD * factor;
}
/**
* Start the computer thread.
*/
static void start()
void start()
{
synchronized( threadLock )
{
running = true;
if( runners == null )
{
// TODO: Update this on config reloads. Or possibly on world restarts?
runners = new TaskRunner[ComputerCraft.computerThreads];
// latency and minPeriod are scaled by 1 + floor(log2(threads)). We can afford to execute tasks for
// longer when executing on more than one thread.
long factor = 64 - Long.numberOfLeadingZeros( runners.length );
latency = DEFAULT_LATENCY * factor;
minPeriod = DEFAULT_MIN_PERIOD * factor;
}
for( int i = 0; i < runners.length; i++ )
{
TaskRunner runner = runners[i];
@@ -174,21 +167,20 @@ public final class ComputerThread
/**
* Attempt to stop the computer thread. This interrupts each runner, and clears the task queue.
*/
public static void stop()
public void stop()
{
synchronized( threadLock )
{
running = false;
if( runners != null )
for( TaskRunner runner : runners )
{
for( TaskRunner runner : runners )
{
if( runner == null ) continue;
if( runner == null ) continue;
runner.running = false;
if( runner.owner != null ) runner.owner.interrupt();
}
runner.running = false;
if( runner.owner != null ) runner.owner.interrupt();
}
if( monitor != null ) monitor.interrupt();
}
computerLock.lock();
@@ -200,17 +192,40 @@ public final class ComputerThread
{
computerLock.unlock();
}
synchronized( threadLock )
{
if( monitor != null ) tryJoin( monitor );
for( TaskRunner runner : runners )
{
if( runner != null && runner.owner != null ) tryJoin( runner.owner );
}
}
}
private static void tryJoin( Thread thread )
{
try
{
thread.join( 100 );
}
catch( InterruptedException e )
{
throw new IllegalStateException( "Interrupted server thread while trying to stop " + thread.getName(), e );
}
if( thread.isAlive() ) ComputerCraft.log.error( "Failed to stop {}", thread.getName() );
}
/**
* Mark a computer as having work, enqueuing it on the thread.
*
* <p>
* You must be holding {@link ComputerExecutor}'s {@code queueLock} when calling this method - it should only
* be called from {@code enqueue}.
*
* @param executor The computer to execute work on.
*/
static void queue( @Nonnull ComputerExecutor executor )
void queue( ComputerExecutor executor )
{
computerLock.lock();
try
@@ -256,12 +271,12 @@ public final class ComputerThread
/**
* Update the {@link ComputerExecutor#virtualRuntime}s of all running tasks, and then update the
* {@link #minimumVirtualRuntime} based on the current tasks.
*
* <p>
* This is called before queueing tasks, to ensure that {@link #minimumVirtualRuntime} is up-to-date.
*
* @param current The machine which we updating runtimes from.
*/
private static void updateRuntimes( @Nullable ComputerExecutor current )
private void updateRuntimes( @Nullable ComputerExecutor current )
{
long minRuntime = Long.MAX_VALUE;
@@ -271,20 +286,16 @@ public final class ComputerThread
// Update all the currently executing tasks
long now = System.nanoTime();
int tasks = 1 + computerQueue.size();
TaskRunner[] currentRunners = runners;
if( currentRunners != null )
for( TaskRunner runner : runners )
{
for( TaskRunner runner : currentRunners )
{
if( runner == null ) continue;
ComputerExecutor executor = runner.currentExecutor.get();
if( executor == null ) continue;
if( runner == null ) continue;
ComputerExecutor executor = runner.currentExecutor.get();
if( executor == null ) continue;
// We do two things here: first we update the task's virtual runtime based on when we
// last checked, and then we check the minimum.
minRuntime = Math.min( minRuntime, executor.virtualRuntime += (now - executor.vRuntimeStart) / tasks );
executor.vRuntimeStart = now;
}
// We do two things here: first we update the task's virtual runtime based on when we
// last checked, and then we check the minimum.
minRuntime = Math.min( minRuntime, executor.virtualRuntime += (now - executor.vRuntimeStart) / tasks );
executor.vRuntimeStart = now;
}
// And update the most recently executed one (if set).
@@ -306,15 +317,18 @@ public final class ComputerThread
* @param runner The runner this task was on.
* @param executor The executor to requeue
*/
private static void afterWork( TaskRunner runner, ComputerExecutor executor )
private void afterWork( TaskRunner runner, ComputerExecutor executor )
{
// Clear the executor's thread.
Thread currentThread = executor.executingThread.getAndSet( null );
if( currentThread != runner.owner )
{
ComputerCraft.log.error(
"Expected computer #{} to be running on {}, but already running on {}. This is a SERIOUS bug, please report with your debug.log.",
executor.getComputer().getID(), runner.owner.getName(), currentThread == null ? "nothing" : currentThread.getName()
executor.getComputer().getID(),
runner.owner == null ? "nothing" : runner.owner.getName(),
currentThread == null ? "nothing" : currentThread.getName()
);
}
@@ -344,7 +358,7 @@ public final class ComputerThread
* @see #DEFAULT_MIN_PERIOD
* @see #LATENCY_MAX_TASKS
*/
static long scaledPeriod()
long scaledPeriod()
{
// +1 to include the current task
int count = 1 + computerQueue.size();
@@ -356,7 +370,7 @@ public final class ComputerThread
*
* @return If we have work queued up.
*/
static boolean hasPendingWork()
boolean hasPendingWork()
{
return !computerQueue.isEmpty();
}
@@ -367,7 +381,7 @@ public final class ComputerThread
*
* @return If the computer threads are busy.
*/
private static boolean isBusy()
private boolean isBusy()
{
return computerQueue.size() > idleWorkers.get();
}
@@ -378,7 +392,7 @@ public final class ComputerThread
*
* @see TimeoutState
*/
private static final class Monitor implements Runnable
private final class Monitor implements Runnable
{
@Override
public void run()
@@ -395,7 +409,10 @@ public final class ComputerThread
}
catch( InterruptedException e )
{
ComputerCraft.log.error( "Monitor thread interrupted. Computers may behave very badly!", e );
if( running )
{
ComputerCraft.log.error( "Monitor thread interrupted. Computers may behave very badly!", e );
}
break;
}
finally
@@ -407,11 +424,9 @@ public final class ComputerThread
}
}
private static void checkRunners()
private void checkRunners()
{
TaskRunner[] currentRunners = ComputerThread.runners;
if( currentRunners == null ) return;
TaskRunner[] currentRunners = ComputerThread.this.runners;
for( int i = 0; i < currentRunners.length; i++ )
{
TaskRunner runner = currentRunners[i];
@@ -421,10 +436,9 @@ public final class ComputerThread
if( !running ) continue;
// Mark the old runner as dead and start a new one.
ComputerCraft.log.warn( "Previous runner ({}) has crashed, restarting!",
runner != null && runner.owner != null ? runner.owner.getName() : runner );
ComputerCraft.log.warn( "Previous runner ({}) has crashed, restarting!", runner != null && runner.owner != null ? runner.owner.getName() : runner );
if( runner != null ) runner.running = false;
runnerFactory.newThread( runners[i] = new TaskRunner() ).start();
runnerFactory.newThread( runner = runners[i] = new TaskRunner() ).start();
}
// If the runner has no work, skip
@@ -437,38 +451,38 @@ public final class ComputerThread
// If we're still within normal execution times (TIMEOUT) or soft abort (ABORT_TIMEOUT),
// then we can let the Lua machine do its work.
long afterStart = executor.timeout.nanoCumulative();
long afterHardAbort = afterStart - TIMEOUT - ABORT_TIMEOUT;
long afterHardAbort = afterStart - TimeoutState.TIMEOUT - TimeoutState.ABORT_TIMEOUT;
if( afterHardAbort < 0 ) continue;
// Set the hard abort flag.
executor.timeout.hardAbort();
executor.abort();
if( afterHardAbort >= ABORT_TIMEOUT * 2 )
if( afterHardAbort >= TimeoutState.ABORT_TIMEOUT * 2 )
{
// If we've hard aborted and interrupted, and we're still not dead, then mark the runner
// as dead, finish off the task, and spawn a new runner.
runner.reportTimeout( executor, afterStart );
runner.running = false;
runner.owner.interrupt();
if( runner.owner != null ) runner.owner.interrupt();
ComputerExecutor thisExecutor = runner.currentExecutor.getAndSet( null );
if( thisExecutor != null ) afterWork( runner, executor );
synchronized( threadLock )
{
if( running && runners.length > i && runners[i] == runner )
if( running && runners[i] == runner )
{
runnerFactory.newThread( currentRunners[i] = new TaskRunner() ).start();
}
}
}
else if( afterHardAbort >= ABORT_TIMEOUT )
else if( afterHardAbort >= TimeoutState.ABORT_TIMEOUT )
{
// If we've hard aborted but we're still not dead, dump the stack trace and interrupt
// the task.
runner.reportTimeout( executor, afterStart );
runner.owner.interrupt();
if( runner.owner != null ) runner.owner.interrupt();
}
}
}
@@ -476,13 +490,14 @@ public final class ComputerThread
/**
* Pulls tasks from the {@link #computerQueue} queue and runs them.
*
* <p>
* This is responsible for running the {@link ComputerExecutor#work()}, {@link ComputerExecutor#beforeWork()} and
* {@link ComputerExecutor#afterWork()} functions. Everything else is either handled by the executor, timeout
* state or monitor.
*/
private static final class TaskRunner implements Runnable
private final class TaskRunner implements Runnable
{
@Nullable
Thread owner;
long lastReport = Long.MIN_VALUE;
volatile boolean running = true;
@@ -495,7 +510,7 @@ public final class ComputerThread
owner = Thread.currentThread();
tasks:
while( running && ComputerThread.running )
while( running && ComputerThread.this.running )
{
// Wait for an active queue to execute
ComputerExecutor executor;
@@ -572,6 +587,8 @@ public final class ComputerThread
if( lastReport != Long.MIN_VALUE && now - lastReport - REPORT_DEBOUNCE <= 0 ) return;
lastReport = now;
Thread owner = Objects.requireNonNull( this.owner );
StringBuilder builder = new StringBuilder()
.append( "Terminating computer #" ).append( executor.getComputer().getID() )
.append( " due to timeout (running for " ).append( time * 1e-9 )

View File

@@ -49,6 +49,8 @@ public final class TimeoutState
*/
public static final String ABORT_MESSAGE = "Too long without yielding";
private final ComputerThread scheduler;
private boolean paused;
private boolean softAbort;
private volatile boolean hardAbort;
@@ -73,6 +75,11 @@ public final class TimeoutState
*/
private long currentDeadline;
public TimeoutState( ComputerThread scheduler )
{
this.scheduler = scheduler;
}
long nanoCumulative()
{
return System.nanoTime() - cumulativeStart;
@@ -91,7 +98,7 @@ public final class TimeoutState
// Important: The weird arithmetic here is important, as nanoTime may return negative values, and so we
// need to handle overflow.
long now = System.nanoTime();
if( !paused ) paused = currentDeadline - now <= 0 && ComputerThread.hasPendingWork(); // now >= currentDeadline
if( !paused ) paused = currentDeadline - now <= 0 && scheduler.hasPendingWork(); // now >= currentDeadline
if( !softAbort ) softAbort = now - cumulativeStart - TIMEOUT >= 0; // now - cumulativeStart >= TIMEOUT
}
@@ -143,7 +150,7 @@ public final class TimeoutState
{
long now = System.nanoTime();
currentStart = now;
currentDeadline = now + ComputerThread.scaledPeriod();
currentDeadline = now + scheduler.scaledPeriod();
// Compute the "nominal start time".
cumulativeStart = now - cumulativeElapsed;
}

View File

@@ -56,7 +56,7 @@ public final class ServerContext
this.server = server;
storageDir = server.getWorldPath( FOLDER );
mainThread = new MainThread();
context = new ComputerContext( new Environment( server ), mainThread );
context = new ComputerContext( new Environment( server ), ComputerCraft.computerThreads, mainThread );
idAssigner = new IDAssigner( storageDir.resolve( "ids.json" ) );
}