Split ComputerThread/ComputerExecutor up a little

This is an attempt to enforce better separation between ComputerThread
and ComputerExecutor. Both of these classes are pretty complex in their
own right, and the way the two bleed into each other makes it all the
more confusing!

This effectively splits the ComputerExecutor into two separate classes:
 - ComputerScheduler.Executor (with the actual implementation inside
   ComputerThread): This holds all the ComputerThread-related logic
   which used to be in ComputerExecutor, including:

    - before/after work hooks
    - is-on-thread tracking
    - virtual runtime computation

 - ComputerScheduler.Worker: This encapsulates all the computer-related
   behaviour. The actual implementation remains in ComputerExecutor.

The boundaries are still a little fuzzy here, and it's all definitely
more coupled then I'd like, but still an improvement!

There are several additional changes at the same time:

 - TimeoutState has also been split up, to better define the boundary
   between consumers (such as ComputerExecutor and ILuaMachine) and
   controllers (ComputerThread).

   The getters still live in TimeoutState, but the core logic lives in
   ManagedTimeoutState.

 - We no longer track cumulative time in the TimeoutState. Instead, we
   allow varying the timeout of a computer. When a computer is paused,
   we store the remaining time, and restore it when resuming again.

   This also allows us give a longer timeout for computer
   startup/shutdown, hopefully avoiding some of those class-not-found
   issues we've seen.

 - We try to make the state machine of how ComputerExecutors live on the
   queue a little more explicit. This is very messy/confusing -
   something I want to property test in the future.

I'm sure there's more to be done here, especially in ComputerExecutor,
but hopefully this makes future changes a little less intimidating.
This commit is contained in:
Jonathan Coates 2023-10-19 22:48:46 +01:00
parent 2228733abc
commit 0929ab577d
No known key found for this signature in database
GPG Key ID: B9E431FF07C98D06
11 changed files with 850 additions and 560 deletions

View File

@ -112,7 +112,9 @@ SPDX-License-Identifier: MPL-2.0
<module name="LambdaParameterName" />
<module name="LocalFinalVariableName" />
<module name="LocalVariableName" />
<module name="MemberName" />
<module name="MemberName">
<property name="format" value="^\$?[a-z][a-zA-Z0-9]*$" />
</module>
<module name="MethodName">
<property name="format" value="^(computercraft\$)?[a-z][a-zA-Z0-9]*$" />
</module>
@ -122,7 +124,7 @@ SPDX-License-Identifier: MPL-2.0
</module>
<module name="ParameterName" />
<module name="StaticVariableName">
<property name="format" value="^[a-z][a-zA-Z0-9]*|CAPABILITY(_[A-Z_]+)?$" />
<property name="format" value="^[a-z][a-zA-Z0-9]*$" />
</module>
<module name="TypeName" />

View File

@ -9,6 +9,7 @@
import dan200.computercraft.core.asm.LuaMethodSupplier;
import dan200.computercraft.core.asm.PeripheralMethodSupplier;
import dan200.computercraft.core.computer.GlobalEnvironment;
import dan200.computercraft.core.computer.computerthread.ComputerScheduler;
import dan200.computercraft.core.computer.computerthread.ComputerThread;
import dan200.computercraft.core.computer.mainthread.MainThreadScheduler;
import dan200.computercraft.core.computer.mainthread.NoWorkMainThreadScheduler;
@ -31,7 +32,7 @@
*/
public final class ComputerContext {
private final GlobalEnvironment globalEnvironment;
private final ComputerThread computerScheduler;
private final ComputerScheduler computerScheduler;
private final MainThreadScheduler mainThreadScheduler;
private final ILuaMachine.Factory luaFactory;
private final List<ILuaAPIFactory> apiFactories;
@ -39,7 +40,7 @@ public final class ComputerContext {
private final MethodSupplier<PeripheralMethod> peripheralMethods;
ComputerContext(
GlobalEnvironment globalEnvironment, ComputerThread computerScheduler,
GlobalEnvironment globalEnvironment, ComputerScheduler computerScheduler,
MainThreadScheduler mainThreadScheduler, ILuaMachine.Factory luaFactory,
List<ILuaAPIFactory> apiFactories, MethodSupplier<LuaMethod> luaMethods,
MethodSupplier<PeripheralMethod> peripheralMethods
@ -68,7 +69,7 @@ public GlobalEnvironment globalEnvironment() {
*
* @return The current computer thread manager.
*/
public ComputerThread computerScheduler() {
public ComputerScheduler computerScheduler() {
return computerScheduler;
}
@ -162,7 +163,7 @@ public static Builder builder(GlobalEnvironment environment) {
*/
public static class Builder {
private final GlobalEnvironment environment;
private int threads = 1;
private @Nullable ComputerScheduler computerScheduler = null;
private @Nullable MainThreadScheduler mainThreadScheduler;
private @Nullable ILuaMachine.Factory luaFactory;
private @Nullable List<ILuaAPIFactory> apiFactories;
@ -173,7 +174,7 @@ public static class Builder {
}
/**
* Set the number of threads the {@link ComputerThread} will use.
* Set the {@link #computerScheduler()} to use {@link ComputerThread} with a given number of threads.
*
* @param threads The number of threads to use.
* @return {@code this}, for chaining
@ -181,7 +182,20 @@ public static class Builder {
*/
public Builder computerThreads(int threads) {
if (threads < 1) throw new IllegalArgumentException("Threads must be >= 1");
this.threads = threads;
return computerScheduler(new ComputerThread(threads));
}
/**
* Set the {@link ComputerScheduler} for this context.
*
* @param scheduler The computer thread scheduler.
* @return {@code this}, for chaining
* @see ComputerContext#mainThreadScheduler()
*/
public Builder computerScheduler(ComputerScheduler scheduler) {
Objects.requireNonNull(scheduler);
if (computerScheduler != null) throw new IllegalStateException("Computer scheduler already specified");
computerScheduler = scheduler;
return this;
}
@ -250,7 +264,7 @@ public Builder genericMethods(Collection<GenericMethod> genericMethods) {
public ComputerContext build() {
return new ComputerContext(
environment,
new ComputerThread(threads),
computerScheduler == null ? new ComputerThread(1) : computerScheduler,
mainThreadScheduler == null ? new NoWorkMainThreadScheduler() : mainThreadScheduler,
luaFactory == null ? CobaltLuaMachine::new : luaFactory,
apiFactories == null ? List.of() : apiFactories,

View File

@ -10,6 +10,7 @@
import dan200.computercraft.core.ComputerContext;
import dan200.computercraft.core.CoreConfig;
import dan200.computercraft.core.apis.*;
import dan200.computercraft.core.computer.computerthread.ComputerScheduler;
import dan200.computercraft.core.computer.computerthread.ComputerThread;
import dan200.computercraft.core.filesystem.FileSystem;
import dan200.computercraft.core.filesystem.FileSystemException;
@ -18,7 +19,6 @@
import dan200.computercraft.core.lua.MachineException;
import dan200.computercraft.core.methods.LuaMethod;
import dan200.computercraft.core.methods.MethodSupplier;
import dan200.computercraft.core.metrics.Metrics;
import dan200.computercraft.core.metrics.MetricsObserver;
import dan200.computercraft.core.util.Colour;
import dan200.computercraft.core.util.Nullability;
@ -26,13 +26,13 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
/**
@ -55,7 +55,7 @@
* One final responsibility for the executor is calling {@link ILuaAPI#update()} every tick, via the {@link #tick()}
* method. This should only be called when the computer is actually on ({@link #isOn}).
*/
final class ComputerExecutor {
final class ComputerExecutor implements ComputerScheduler.Worker {
private static final Logger LOG = LoggerFactory.getLogger(ComputerExecutor.class);
private static final int QUEUE_LIMIT = 256;
@ -63,9 +63,7 @@ final class ComputerExecutor {
private final ComputerEnvironment computerEnvironment;
private final MetricsObserver metrics;
private final List<ApiWrapper> apis = new ArrayList<>();
private final ComputerThread scheduler;
private final MethodSupplier<LuaMethod> luaMethods;
final TimeoutState timeout;
private @Nullable FileSystem fileSystem;
@ -93,34 +91,11 @@ final class ComputerExecutor {
private final ReentrantLock isOnLock = new ReentrantLock();
/**
* A lock used for any changes to {@link #eventQueue}, {@link #command} or {@link #onComputerQueue}. This will be
* used on the main thread, so locks should be kept as brief as possible.
* A lock used for any changes to {@link #eventQueue} or {@link #command}. This will be used on the main thread,
* so locks should be kept as brief as possible.
*/
private final Object queueLock = new Object();
/**
* Determines if this executor is present within {@link ComputerThread}.
*
* @see #queueLock
* @see #enqueue()
* @see #afterWork()
*/
volatile boolean onComputerQueue = false;
/**
* The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers.
*
* @see ComputerThread
*/
long virtualRuntime = 0;
/**
* The last time at which we updated {@link #virtualRuntime}.
*
* @see ComputerThread
*/
long vRuntimeStart;
/**
* The command that {@link #work()} should execute on the computer thread.
* <p>
@ -130,6 +105,7 @@ final class ComputerExecutor {
* Note, if command is not {@code null}, then some command is scheduled to be executed. Otherwise it is not
* currently in the queue (or is currently being executed).
*/
@GuardedBy("queueLock")
private volatile @Nullable StateCommand command;
/**
@ -137,43 +113,45 @@ final class ComputerExecutor {
* <p>
* Note, this should be empty if this computer is off - it is cleared on shutdown and when turning on again.
*/
@GuardedBy("queueLock")
private final Queue<Event> eventQueue = new ArrayDeque<>(4);
/**
* Whether we interrupted an event and so should resume it instead of executing another task.
* Whether this computer was paused (and so should resume without pulling an event) or not.
*
* @see #timeRemaining
* @see #work()
* @see #resumeMachine(String, Object[])
*/
private boolean interruptedEvent = false;
private boolean wasPaused;
/**
* The amount of time this computer can run for before being interrupted. This is only defined when
* {@link #wasPaused} is set.
*/
private long timeRemaining = 0;
/**
* Whether this executor has been closed, and will no longer accept any incoming commands or events.
*
* @see #queueStop(boolean, boolean)
*/
@GuardedBy("queueLock")
private boolean closed;
private @Nullable WritableMount rootMount;
/**
* The thread the executor is running on. This is non-null when performing work. We use this to ensure we're only
* doing one bit of work at one time.
*
* @see ComputerThread
*/
final AtomicReference<Thread> executingThread = new AtomicReference<>();
private final ILuaMachine.Factory luaFactory;
private final ComputerScheduler.Executor executor;
ComputerExecutor(Computer computer, ComputerEnvironment computerEnvironment, ComputerContext context) {
this.computer = computer;
this.computerEnvironment = computerEnvironment;
metrics = computerEnvironment.getMetrics();
luaFactory = context.luaFactory();
scheduler = context.computerScheduler();
luaMethods = context.luaMethods();
timeout = new TimeoutState(scheduler);
executor = context.computerScheduler().createExecutor(this, metrics);
var environment = computer.getEnvironment();
@ -193,6 +171,11 @@ final class ComputerExecutor {
}
}
@Override
public int getComputerID() {
return computer.getID();
}
boolean isOn() {
return isOn;
}
@ -203,10 +186,6 @@ FileSystem getFileSystem() {
return fileSystem;
}
Computer getComputer() {
return computer;
}
void addApi(ILuaAPI api) {
apis.add(new ApiWrapper(api, null));
}
@ -220,8 +199,8 @@ void queueStart() {
if (closed || isOn || command != null) return;
command = StateCommand.TURN_ON;
enqueue();
}
enqueue();
}
/**
@ -246,24 +225,31 @@ void queueStop(boolean reboot, boolean close) {
}
command = newCommand;
enqueue();
}
enqueue();
}
/**
* Abort this whole computer due to a timeout. This will immediately destroy the Lua machine,
* and then schedule a shutdown.
*/
void abort() {
immediateFail(StateCommand.ABORT);
@Override
public void abortWithTimeout() {
immediateFail(StateCommand.ABORT_WITH_TIMEOUT);
}
/**
* Abort this whole computer due to an internal error. This will immediately destroy the Lua machine,
* and then schedule a shutdown.
*/
void fastFail() {
immediateFail(StateCommand.ERROR);
@Override
public void abortWithError() {
immediateFail(StateCommand.ABORT_WITH_ERROR);
}
@Override
public void unload() {
queueStop(false, true);
}
private void immediateFail(StateCommand command) {
@ -294,17 +280,15 @@ void queueEvent(String event, @Nullable Object[] args) {
if (closed || command != null || eventQueue.size() >= QUEUE_LIMIT) return;
eventQueue.offer(new Event(event, args));
enqueue();
}
enqueue();
}
/**
* Add this executor to the {@link ComputerThread} if not already there.
*/
private void enqueue() {
synchronized (queueLock) {
if (!onComputerQueue) scheduler.queue(this);
}
executor.submit();
}
/**
@ -385,7 +369,7 @@ private ILuaMachine createLuaMachine() {
// Create the lua machine
try (var bios = biosStream) {
return luaFactory.create(new MachineEnvironment(
new LuaContext(computer), metrics, timeout,
new LuaContext(computer), metrics, executor.timeoutState(),
() -> apis.stream().map(ApiWrapper::api).iterator(),
luaMethods,
computer.getGlobalEnvironment().getHostString()
@ -405,7 +389,6 @@ private void turnOn() throws InterruptedException {
try {
// Reset the terminal and event queue
computer.getTerminal().reset();
interruptedEvent = false;
synchronized (queueLock) {
eventQueue.clear();
}
@ -433,15 +416,15 @@ private void turnOn() throws InterruptedException {
isOnLock.unlock();
}
// Now actually start the computer, now that everything is set up.
resumeMachine(null, null);
// Mark the Lua VM as ready to be executed next time.
wasPaused = true;
timeRemaining = TimeoutState.TIMEOUT;
}
private void shutdown() throws InterruptedException {
isOnLock.lockInterruptibly();
try {
isOn = false;
interruptedEvent = false;
isOn = wasPaused = false;
synchronized (queueLock) {
eventQueue.clear();
}
@ -469,36 +452,6 @@ private void shutdown() throws InterruptedException {
}
}
/**
* Called before calling {@link #work()}, setting up any important state.
*/
void beforeWork() {
vRuntimeStart = System.nanoTime();
timeout.startTimer();
}
/**
* Called after executing {@link #work()}.
*
* @return If we have more work to do.
*/
boolean afterWork() {
if (interruptedEvent) {
timeout.pauseTimer();
} else {
timeout.stopTimer();
}
metrics.observe(Metrics.COMPUTER_TASKS, timeout.nanoCurrent());
if (interruptedEvent) return true;
synchronized (queueLock) {
if (eventQueue.isEmpty() && command == null) return onComputerQueue = false;
return true;
}
}
/**
* The main worker function, called by {@link ComputerThread}.
* <p>
@ -508,15 +461,15 @@ boolean afterWork() {
* @see #command
* @see #eventQueue
*/
void work() throws InterruptedException {
if (interruptedEvent && !closed) {
interruptedEvent = false;
if (machine != null) {
resumeMachine(null, null);
return;
}
@Override
public void work() throws InterruptedException {
workImpl();
synchronized (queueLock) {
if (wasPaused || command != null || !eventQueue.isEmpty()) enqueue();
}
}
private void workImpl() throws InterruptedException {
StateCommand command;
Event event = null;
synchronized (queueLock) {
@ -524,7 +477,7 @@ void work() throws InterruptedException {
this.command = null;
// If we've no command, pull something from the event queue instead.
if (command == null) {
if (command == null && !wasPaused) {
if (!isOn) {
// We're not on and had no command, but we had work queued. This should never happen, so clear
// the event queue just in case.
@ -537,6 +490,7 @@ void work() throws InterruptedException {
}
if (command != null) {
wasPaused = false;
switch (command) {
case TURN_ON -> {
if (isOn) return;
@ -553,23 +507,29 @@ void work() throws InterruptedException {
shutdown();
computer.turnOn();
}
case ABORT -> {
case ABORT_WITH_TIMEOUT -> {
if (!isOn) return;
displayFailure("Error running computer", TimeoutState.ABORT_MESSAGE);
shutdown();
}
case ERROR -> {
case ABORT_WITH_ERROR -> {
if (!isOn) return;
displayFailure("Error running computer", "An internal error occurred, see logs.");
shutdown();
}
}
} else if (wasPaused) {
executor.setRemainingTime(timeRemaining);
resumeMachine(null, null);
} else if (event != null) {
executor.setRemainingTime(TimeoutState.TIMEOUT);
resumeMachine(event.name, event.args);
}
}
void printState(StringBuilder out) {
@Override
@SuppressWarnings("GuardedBy")
public void writeState(StringBuilder out) {
out.append("Enqueued command: ").append(command).append('\n');
out.append("Enqueued events: ").append(eventQueue.size()).append('\n');
@ -600,19 +560,23 @@ private void displayFailure(String message, @Nullable String extra) {
private void resumeMachine(@Nullable String event, @Nullable Object[] args) throws InterruptedException {
var result = Nullability.assertNonNull(machine).handleEvent(event, args);
interruptedEvent = result.isPause();
if (!result.isError()) return;
displayFailure("Error running computer", result.getMessage());
shutdown();
if (result.isError()) {
displayFailure("Error running computer", result.getMessage());
shutdown();
} else if (result.isPause()) {
wasPaused = true;
timeRemaining = executor.getRemainingTime();
} else {
wasPaused = false;
}
}
private enum StateCommand {
TURN_ON,
SHUTDOWN,
REBOOT,
ABORT,
ERROR,
ABORT_WITH_TIMEOUT,
ABORT_WITH_ERROR,
}
private record Event(String name, @Nullable Object[] args) {

View File

@ -5,7 +5,8 @@
package dan200.computercraft.core.computer;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import dan200.computercraft.core.computer.computerthread.ComputerThread;
import dan200.computercraft.core.computer.computerthread.ComputerScheduler;
import dan200.computercraft.core.computer.computerthread.ManagedTimeoutState;
import dan200.computercraft.core.lua.ILuaMachine;
import dan200.computercraft.core.lua.MachineResult;
@ -25,90 +26,54 @@
* (namely, throwing a "Too long without yielding" error).
* <p>
* Now, if a computer still does not stop after that period, they're behaving really badly. 1.5 seconds after a soft
* abort ({@link #ABORT_TIMEOUT}), we trigger a hard abort (note, this is done from the computer thread manager). This
* will destroy the entire Lua runtime and shut the computer down.
* abort ({@link #ABORT_TIMEOUT}), we trigger a hard abort. This will destroy the entire Lua runtime and shut the
* computer down.
* <p>
* The Lua runtime is also allowed to pause execution if there are other computers contesting for work. All computers
* are allowed to run for {@link ComputerThread#scaledPeriod()} nanoseconds (see {@link #currentDeadline}). After that
* period, if any computers are waiting to be executed then we'll set the paused flag to true ({@link #isPaused()}.
* are guaranteed to run for some time. After that period, if any computers are waiting to be executed then we'll set
* the paused flag to true ({@link #isPaused()}.
*
* @see ComputerThread
* @see ComputerScheduler
* @see ManagedTimeoutState
* @see ILuaMachine
* @see MachineResult#isPause()
*/
public final class TimeoutState {
public abstract class TimeoutState {
/**
* The total time a task is allowed to run before aborting in nanoseconds.
* The time (in nanoseconds) are computer is allowed to run for its long-running tasks, such as startup and
* shutdown.
*/
static final long TIMEOUT = TimeUnit.MILLISECONDS.toNanos(7000);
public static final long BASE_TIMEOUT = TimeUnit.SECONDS.toNanos(30);
/**
* The total time the Lua VM is allowed to run before aborting in nanoseconds.
*/
public static final long TIMEOUT = TimeUnit.MILLISECONDS.toNanos(7000);
/**
* The time the task is allowed to run after each abort in nanoseconds.
*/
static final long ABORT_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(1500);
public static final long ABORT_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(1500);
/**
* The error message to display when we trigger an abort.
*/
public static final String ABORT_MESSAGE = "Too long without yielding";
private final ComputerThread scheduler;
@GuardedBy("this")
private final List<Runnable> listeners = new ArrayList<>(0);
private boolean paused;
private boolean softAbort;
private volatile boolean hardAbort;
/**
* When the cumulative time would have started had the whole event been processed in one go.
*/
private long cumulativeStart;
/**
* How much cumulative time has elapsed. This is effectively {@code cumulativeStart - currentStart}.
*/
private long cumulativeElapsed;
/**
* When this execution round started.
*/
private long currentStart;
/**
* When this execution round should look potentially be paused.
*/
private long currentDeadline;
public TimeoutState(ComputerThread scheduler) {
this.scheduler = scheduler;
}
long nanoCumulative() {
return System.nanoTime() - cumulativeStart;
}
long nanoCurrent() {
return System.nanoTime() - currentStart;
}
protected boolean paused;
protected boolean softAbort;
protected volatile boolean hardAbort;
/**
* Recompute the {@link #isSoftAborted()} and {@link #isPaused()} flags.
* <p>
* Normally this will be called automatically by the {@link ComputerScheduler}, but it may be useful to call this
* manually if the most up-to-date information is needed.
*/
public synchronized void refresh() {
// Important: The weird arithmetic here is important, as nanoTime may return negative values, and so we
// need to handle overflow.
var now = System.nanoTime();
var changed = false;
if (!paused && (paused = currentDeadline - now <= 0 && scheduler.hasPendingWork())) { // now >= currentDeadline
changed = true;
}
if (!softAbort && (softAbort = now - cumulativeStart - TIMEOUT >= 0)) { // now - cumulativeStart >= TIMEOUT
changed = true;
}
if (changed) updateListeners();
}
public abstract void refresh();
/**
* Whether we should pause execution of this machine.
@ -118,7 +83,7 @@ public synchronized void refresh() {
*
* @return Whether we should pause execution.
*/
public boolean isPaused() {
public final boolean isPaused() {
return paused;
}
@ -127,7 +92,7 @@ public boolean isPaused() {
*
* @return {@code true} if we should throw a timeout error.
*/
public boolean isSoftAborted() {
public final boolean isSoftAborted() {
return softAbort;
}
@ -136,64 +101,22 @@ public boolean isSoftAborted() {
*
* @return {@code true} if the machine should be forcibly shut down.
*/
public boolean isHardAborted() {
public final boolean isHardAborted() {
return hardAbort;
}
/**
* If the machine should be forcibly aborted.
*/
void hardAbort() {
softAbort = hardAbort = true;
synchronized (this) {
updateListeners();
}
}
/**
* Start the current and cumulative timers again.
*/
void startTimer() {
var now = System.nanoTime();
currentStart = now;
currentDeadline = now + scheduler.scaledPeriod();
// Compute the "nominal start time".
cumulativeStart = now - cumulativeElapsed;
}
/**
* Pauses the cumulative time, to be resumed by {@link #startTimer()}.
*
* @see #nanoCumulative()
*/
synchronized void pauseTimer() {
// We set the cumulative time to difference between current time and "nominal start time".
cumulativeElapsed = System.nanoTime() - cumulativeStart;
paused = false;
updateListeners();
}
/**
* Resets the cumulative time and resets the abort flags.
*/
synchronized void stopTimer() {
cumulativeElapsed = 0;
paused = softAbort = hardAbort = false;
updateListeners();
}
@GuardedBy("this")
private void updateListeners() {
protected final void updateListeners() {
for (var listener : listeners) listener.run();
}
public synchronized void addListener(Runnable listener) {
public final synchronized void addListener(Runnable listener) {
Objects.requireNonNull(listener, "listener cannot be null");
listeners.add(listener);
listener.run();
}
public synchronized void removeListener(Runnable listener) {
public final synchronized void removeListener(Runnable listener) {
Objects.requireNonNull(listener, "listener cannot be null");
listeners.remove(listener);
}

View File

@ -0,0 +1,127 @@
// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.MetricsObserver;
import java.util.concurrent.TimeUnit;
/**
* The {@link ComputerScheduler} is responsible for executing computers on the computer thread(s).
* <p>
* This handles both scheduling the computers for work across multiple threads, as well as {@linkplain TimeoutState timing out}
* or pausing the computer if they execute for too long.
* <p>
* This API is composed of two interfaces, a {@link Worker} and {@link Executor}. The {@link ComputerScheduler}
* implementation will supply an {@link Executor}, while consuming classes should implement {@link Worker}.
* <p>
* In practice, this interface is only implemented by {@link ComputerThread} (and consumed by {@link dan200.computercraft.core.computer.ComputerExecutor}),
* however this interface is useful to enforce separation of the two.
*
* @see ManagedTimeoutState
*/
public interface ComputerScheduler {
Executor createExecutor(Worker worker, MetricsObserver metrics);
boolean stop(long timeout, TimeUnit unit) throws InterruptedException;
/**
* The {@link Executor} holds the state of a {@link Worker} within the scheduler.
* <p>
* This is used to schedule the worker for execution, as well as providing some additional control over the
* {@link TimeoutState}.
*/
interface Executor {
/**
* Submit the executor to the scheduler, marking it as ready {@linkplain Worker#work() to run some work}.
* <p>
* This function is idempotent - if the executor is already queued, nothing will happen.
*/
void submit();
/**
* Get the executor's {@link TimeoutState}.
*
* @return The executor's timeout state.
*/
TimeoutState timeoutState();
/**
* Get the amount of time this computer can run for before being interrupted.
* <p>
* This value starts off as {@link TimeoutState#BASE_TIMEOUT}, but may be reduced by
* {@link #setRemainingTime(long)}.
*
* @return The time this computer can run for being interrupted.
* @see #getRemainingTime()
*/
long getRemainingTime();
/**
* Set the amount of this computer can execute for before being interrupted.
* <p>
* This value will typically be {@link TimeoutState#TIMEOUT}, but may be a previous value of
* {@link #getRemainingTime()} if the computer is resuming after {@linkplain TimeoutState#isPaused() being
* paused}.
*
* @param time The time this computer can execute for.
* @see #getRemainingTime()
*/
void setRemainingTime(long time);
}
/**
* A {@link Worker} is responsible for actually running the computer's code.
* <p>
* his handles {@linkplain Worker#work() running the actual computer logic}, as well as providing some additional
* control methods.
* <p>
* This should be implemented by the consuming class.
*/
interface Worker {
/**
* Perform any work that the computer needs to do, for instance turning on, shutting down or actually running
* code.
* <p>
* If the computer needs to run immediately again, it should call {@link Executor#submit()} within this method.
*
* @throws InterruptedException If the computer has run for too long and must be terminated.
*/
void work() throws InterruptedException;
/**
* Get the ID of this computer, used in error messages.
*
* @return This computers ID.
*/
int getComputerID();
/**
* Write any useful debugging information computer to the provided buffer. This is used in log messages when the
* computer has run for too long.
*
* @param output The buffer to write to.
*/
void writeState(StringBuilder output);
/**
* Abort this whole computer due to a timeout.
*/
void abortWithTimeout();
/**
* Abort this whole computer due to some internal error.
*/
void abortWithError();
/**
* "Unload" this computer, shutting it down and preventing it from running again.
* <p>
* This is called by the scheduler when {@linkplain ComputerScheduler#stop(long, TimeUnit) it is stopped.}
*/
void unload();
}
}

View File

@ -5,25 +5,23 @@
package dan200.computercraft.core.computer.computerthread;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import dan200.computercraft.core.ComputerContext;
import dan200.computercraft.core.Logging;
import dan200.computercraft.core.computer.ComputerExecutor;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.Metrics;
import dan200.computercraft.core.metrics.MetricsObserver;
import dan200.computercraft.core.util.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
@ -31,9 +29,9 @@
/**
* Runs all scheduled tasks for computers in a {@link ComputerContext}.
* <p>
* This acts as an over-complicated {@link ThreadPoolExecutor}: It creates several {@link Worker} threads which pull
* tasks from a shared queue, executing them. It also creates a single {@link Monitor} thread, which updates computer
* timeouts, killing workers if they have not been terminated by {@link TimeoutState#isSoftAborted()}.
* This acts as an over-complicated {@link ThreadPoolExecutor}: It creates several {@linkplain WorkerThread worker
* threads} which pull tasks from a shared queue, executing them. It also creates a single {@link Monitor} thread, which
* updates computer timeouts, killing workers if they have not been terminated by {@link TimeoutState#isSoftAborted()}.
* <p>
* Computers are executed using a priority system, with those who have spent less time executing having a higher
* priority than those hogging the thread. This, combined with {@link TimeoutState#isPaused()} means we can reduce the
@ -41,20 +39,16 @@
* <p>
* This is done using an implementation of Linux's Completely Fair Scheduler. When a computer executes, we compute what
* share of execution time it has used (time executed/number of tasks). We then pick the computer who has the least
* "virtual execution time" (aka {@link ComputerExecutor#virtualRuntime}).
* "virtual execution time" (aka {@link ExecutorImpl#virtualRuntime}).
* <p>
* When adding a computer to the queue, we make sure its "virtual runtime" is at least as big as the smallest runtime.
* This means that adding computers which have slept a lot do not then have massive priority over everyone else. See
* {@link #queue(ComputerExecutor)} for how this is implemented.
* {@link #queue(ExecutorImpl)} for how this is implemented.
* <p>
* In reality, it's unlikely that more than a few computers are waiting to execute at once, so this will not have much
* effect unless you have a computer hogging execution time. However, it is pretty effective in those situations.
*
* @see TimeoutState For how hard timeouts are handled.
* @see ComputerExecutor For how computers actually do execution.
*/
@SuppressWarnings("GuardedBy") // FIXME: Hard to know what the correct thing to do is.
public final class ComputerThread {
public final class ComputerThread implements ComputerScheduler {
private static final Logger LOG = LoggerFactory.getLogger(ComputerThread.class);
/**
@ -98,7 +92,7 @@ public final class ComputerThread {
/**
* Time difference between reporting crashed threads.
*
* @see Worker#reportTimeout(ComputerExecutor, long)
* @see WorkerThread#reportTimeout(ExecutorImpl, long)
*/
private static final long REPORT_DEBOUNCE = TimeUnit.SECONDS.toNanos(1);
@ -125,7 +119,7 @@ public final class ComputerThread {
* The array of current workers, and their owning threads.
*/
@GuardedBy("threadLock")
private final Worker[] workers;
private final WorkerThread[] workers;
/**
* The number of workers in {@link #workers}.
@ -139,29 +133,33 @@ public final class ComputerThread {
private final long minPeriod;
private final ReentrantLock computerLock = new ReentrantLock();
private final Condition workerWakeup = computerLock.newCondition();
private final Condition monitorWakeup = computerLock.newCondition();
private final @GuardedBy("computerLock") Condition workerWakeup = computerLock.newCondition();
private final @GuardedBy("computerLock") Condition monitorWakeup = computerLock.newCondition();
private final AtomicInteger idleWorkers = new AtomicInteger(0);
/**
* Active queues to execute.
*/
private final TreeSet<ComputerExecutor> computerQueue = new TreeSet<>((a, b) -> {
@GuardedBy("computerLock")
private final TreeSet<ExecutorImpl> computerQueue = new TreeSet<>(ComputerThread::compareExecutors);
@SuppressWarnings("GuardedBy")
private static int compareExecutors(ExecutorImpl a, ExecutorImpl b) {
if (a == b) return 0; // Should never happen, but let's be consistent here
long at = a.virtualRuntime, bt = b.virtualRuntime;
if (at == bt) return Integer.compare(a.hashCode(), b.hashCode());
return at < bt ? -1 : 1;
});
}
/**
* The minimum {@link ComputerExecutor#virtualRuntime} time on the tree.
* The minimum {@link ExecutorImpl#virtualRuntime} time on the tree.
*/
private long minimumVirtualRuntime = 0;
public ComputerThread(int threadCount) {
workers = new Worker[threadCount];
workers = new WorkerThread[threadCount];
// latency and minPeriod are scaled by 1 + floor(log2(threads)). We can afford to execute tasks for
// longer when executing on more than one thread.
@ -170,13 +168,28 @@ public ComputerThread(int threadCount) {
minPeriod = DEFAULT_MIN_PERIOD * factor;
}
@Override
public Executor createExecutor(ComputerScheduler.Worker worker, MetricsObserver metrics) {
return new ExecutorImpl(worker, metrics);
}
@GuardedBy("threadLock")
private void addWorker(int index) {
LOG.trace("Spawning new worker {}.", index);
(workers[index] = new Worker(index)).owner.start();
(workers[index] = new WorkerThread(index)).owner.start();
workerCount++;
}
@SuppressWarnings("GuardedBy")
private int workerCount() {
return workerCount;
}
@SuppressWarnings("GuardedBy")
private WorkerThread[] workersReadOnly() {
return workers;
}
/**
* Ensure sufficient workers are running.
*/
@ -184,7 +197,7 @@ private void addWorker(int index) {
private void ensureRunning() {
// Don't even enter the lock if we've a monitor and don't need to/can't spawn an additional worker.
// We'll be holding the computer lock at this point, so there's no problems with idleWorkers being wrong.
if (monitor != null && (idleWorkers.get() > 0 || workerCount == workers.length)) return;
if (monitor != null && (idleWorkers.get() > 0 || workerCount() == workersReadOnly().length)) return;
threadLock.lock();
try {
@ -219,6 +232,7 @@ private void advanceState(int newState) {
* @return Whether the thread was successfully shut down.
* @throws InterruptedException If interrupted while waiting.
*/
@Override
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
advanceState(STOPPING);
@ -271,12 +285,12 @@ public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
/**
* Mark a computer as having work, enqueuing it on the thread.
* <p>
* You must be holding {@link ComputerExecutor}'s {@code queueLock} when calling this method - it should only
* You must be holding {@link ExecutorImpl}'s {@code queueLock} when calling this method - it should only
* be called from {@code enqueue}.
*
* @param executor The computer to execute work on.
*/
void queue(ComputerExecutor executor) {
void queue(ExecutorImpl executor) {
computerLock.lock();
try {
if (state.get() != RUNNING) throw new IllegalStateException("ComputerThread is no longer running");
@ -284,9 +298,6 @@ void queue(ComputerExecutor executor) {
// Ensure we've got a worker running.
ensureRunning();
if (executor.onComputerQueue) throw new IllegalStateException("Cannot queue already queued executor");
executor.onComputerQueue = true;
updateRuntimes(null);
// We're not currently on the queue, so update its current execution time to
@ -318,14 +329,15 @@ void queue(ComputerExecutor executor) {
/**
* Update the {@link ComputerExecutor#virtualRuntime}s of all running tasks, and then update the
* Update the {@link ExecutorImpl#virtualRuntime}s of all running tasks, and then update the
* {@link #minimumVirtualRuntime} based on the current tasks.
* <p>
* This is called before queueing tasks, to ensure that {@link #minimumVirtualRuntime} is up-to-date.
*
* @param current The machine which we updating runtimes from.
*/
private void updateRuntimes(@Nullable ComputerExecutor current) {
@GuardedBy("computerLock")
private void updateRuntimes(@Nullable ExecutorImpl current) {
var minRuntime = Long.MAX_VALUE;
// If we've a task on the queue, use that as our base time.
@ -334,7 +346,7 @@ private void updateRuntimes(@Nullable ComputerExecutor current) {
// Update all the currently executing tasks
var now = System.nanoTime();
var tasks = 1 + computerQueue.size();
for (@Nullable var runner : workers) {
for (@Nullable var runner : workersReadOnly()) {
if (runner == null) continue;
var executor = runner.currentExecutor.get();
if (executor == null) continue;
@ -359,22 +371,9 @@ private void updateRuntimes(@Nullable ComputerExecutor current) {
* Ensure the "currently working" state of the executor is reset, the timings are updated, and then requeue the
* executor if needed.
*
* @param runner The runner this task was on.
* @param executor The executor to requeue
*/
private void afterWork(Worker runner, ComputerExecutor executor) {
// Clear the executor's thread.
var currentThread = executor.executingThread.getAndSet(null);
if (currentThread != runner.owner) {
LOG.error(
"Expected computer #{} to be running on {}, but already running on {}. This is a SERIOUS bug, please report with your debug.log.",
executor.getComputer().getID(),
runner.owner.getName(),
currentThread == null ? "nothing" : currentThread.getName()
);
}
private void afterWork(ExecutorImpl executor) {
computerLock.lock();
try {
updateRuntimes(executor);
@ -390,6 +389,13 @@ private void afterWork(Worker runner, ComputerExecutor executor) {
}
}
@SuppressWarnings("GuardedBy")
private int computerQueueSize() {
// FIXME: We access this on other threads (in TimeoutState), so their reads won't be consistent. This isn't
// "critical" behaviour, so not clear if it matters too much.
return computerQueue.size();
}
/**
* The scaled period for a single task.
*
@ -399,11 +405,8 @@ private void afterWork(Worker runner, ComputerExecutor executor) {
* @see #LATENCY_MAX_TASKS
*/
long scaledPeriod() {
// FIXME: We access this on other threads (in TimeoutState), so their reads won't be consistent. This isn't
// "critical" behaviour, so not clear if it matters too much.
// +1 to include the current task
var count = 1 + computerQueue.size();
var count = 1 + computerQueueSize();
return count < LATENCY_MAX_TASKS ? latency / count : minPeriod;
}
@ -413,9 +416,8 @@ long scaledPeriod() {
* @return If we have work queued up.
*/
@VisibleForTesting
public boolean hasPendingWork() {
// FIXME: See comment in scaledPeriod. Again, we access this in multiple threads but not clear if it matters!
return !computerQueue.isEmpty();
boolean hasPendingWork() {
return computerQueueSize() > 0;
}
/**
@ -424,12 +426,11 @@ public boolean hasPendingWork() {
*
* @return If the computer threads are busy.
*/
@GuardedBy("computerLock")
private boolean isBusy() {
return computerQueue.size() > idleWorkers.get();
return computerQueueSize() > idleWorkers.get();
}
private void workerFinished(Worker worker) {
private void workerFinished(WorkerThread worker) {
// We should only shut down a worker once! This should only happen if we fail to abort a worker and then the
// worker finishes normally.
if (!worker.running.getAndSet(false)) return;
@ -444,6 +445,7 @@ private void workerFinished(Worker worker) {
workerCount--;
if (workers[worker.index] != worker) {
assert false : "workerFinished but inconsistent worker";
LOG.error("Worker {} closed, but new runner has been spawned.", worker.index);
} else if (state.get() == RUNNING || (state.get() == STOPPING && hasPendingWork())) {
addWorker(worker.index);
@ -457,7 +459,7 @@ private void workerFinished(Worker worker) {
}
/**
* Observes all currently active {@link Worker}s and terminates their tasks once they have exceeded the hard
* Observes all currently active {@link WorkerThread}s and terminates their tasks once they have exceeded the hard
* abort limit.
*
* @see TimeoutState
@ -493,7 +495,7 @@ private void runImpl() {
}
private void checkRunners() {
for (@Nullable var runner : workers) {
for (@Nullable var runner : workersReadOnly()) {
if (runner == null) continue;
// If the worker has no work, skip
@ -505,25 +507,28 @@ private void checkRunners() {
// If we're still within normal execution times (TIMEOUT) or soft abort (ABORT_TIMEOUT),
// then we can let the Lua machine do its work.
var afterStart = executor.timeout.nanoCumulative();
var afterHardAbort = afterStart - TimeoutState.TIMEOUT - TimeoutState.ABORT_TIMEOUT;
var remainingTime = executor.timeout.getRemainingTime();
// If remainingTime > 0, then we're executing normally,
// If remainingTime > -ABORT_TIMEOUT, then we've soft aborted.
// Otherwise, remainingTime <= -ABORT_TIMEOUT, and we've run over by -ABORT_TIMEOUT - remainingTime.
var afterHardAbort = -remainingTime - TimeoutState.ABORT_TIMEOUT;
if (afterHardAbort < 0) continue;
// Set the hard abort flag.
executor.timeout.hardAbort();
executor.abort();
executor.worker.abortWithTimeout();
if (afterHardAbort >= TimeoutState.ABORT_TIMEOUT * 2) {
// If we've hard aborted and interrupted, and we're still not dead, then mark the worker
// as dead, finish off the task, and spawn a new runner.
runner.reportTimeout(executor, afterStart);
runner.reportTimeout(executor, remainingTime);
runner.owner.interrupt();
workerFinished(runner);
} else if (afterHardAbort >= TimeoutState.ABORT_TIMEOUT) {
// If we've hard aborted but we're still not dead, dump the stack trace and interrupt
// the task.
runner.reportTimeout(executor, afterStart);
runner.reportTimeout(executor, remainingTime);
runner.owner.interrupt();
}
}
@ -533,11 +538,11 @@ private void checkRunners() {
/**
* Pulls tasks from the {@link #computerQueue} queue and runs them.
* <p>
* This is responsible for running the {@link ComputerExecutor#work()}, {@link ComputerExecutor#beforeWork()} and
* {@link ComputerExecutor#afterWork()} functions. Everything else is either handled by the executor, timeout
* state or monitor.
* This is responsible for running the {@link ComputerScheduler.Worker#work()}, {@link ExecutorImpl#beforeWork()}
* and {@link ExecutorImpl#afterWork()} functions. Everything else is either handled by the executor,
* timeout state or monitor.
*/
private final class Worker implements Runnable {
private final class WorkerThread implements Runnable {
/**
* The index into the {@link #workers} array.
*/
@ -552,21 +557,21 @@ private final class Worker implements Runnable {
* Whether this runner is currently executing. This may be set to false when this worker terminates, or when
* we try to abandon a worker in the monitor
*
* @see #workerFinished(Worker)
* @see #workerFinished(WorkerThread)
*/
final AtomicBoolean running = new AtomicBoolean(true);
/**
* The computer we're currently running.
*/
final AtomicReference<ComputerExecutor> currentExecutor = new AtomicReference<>(null);
final AtomicReference<ExecutorImpl> currentExecutor = new AtomicReference<>(null);
/**
* The last time we reported a stack trace, used to avoid spamming the logs.
*/
AtomicLong lastReport = new AtomicLong(Long.MIN_VALUE);
Worker(int index) {
WorkerThread(int index) {
this.index = index;
owner = workerFactory.newThread(this);
}
@ -581,10 +586,9 @@ public void run() {
}
private void runImpl() {
tasks:
while (running.get()) {
// Wait for an active queue to execute
ComputerExecutor executor;
ExecutorImpl executor;
computerLock.lock();
try {
idleWorkers.getAndIncrement();
@ -599,21 +603,18 @@ private void runImpl() {
computerLock.unlock();
}
// If we're trying to executing some task on this computer while someone else is doing work, something
// is seriously wrong.
while (!executor.executingThread.compareAndSet(null, owner)) {
var existing = executor.executingThread.get();
if (existing != null) {
LOG.error(
"Trying to run computer #{} on thread {}, but already running on {}. This is a SERIOUS bug, please report with your debug.log.",
executor.getComputer().getID(), owner.getName(), existing.getName()
);
continue tasks;
}
// Mark this computer as executing.
if (!ExecutorImpl.STATE.compareAndSet(executor, ExecutorState.ON_QUEUE, ExecutorState.RUNNING)) {
assert false : "Running computer on the wrong thread";
LOG.error(
"Trying to run computer #{} on thread {}, but already running on another thread. This is a SERIOUS " +
"bug, please report with your debug.log.",
executor.worker.getComputerID(), owner.getName()
);
}
// If we're stopping, the only thing this executor should be doing is shutting down.
if (state.get() >= STOPPING) executor.queueStop(false, true);
if (state.get() >= STOPPING) executor.worker.unload();
// Reset the timers
executor.beforeWork();
@ -624,19 +625,19 @@ private void runImpl() {
// Execute the task
try {
executor.work();
executor.worker.work();
} catch (Exception | LinkageError | VirtualMachineError e) {
LOG.error("Error running task on computer #" + executor.getComputer().getID(), e);
LOG.error("Error running task on computer #" + executor.worker.getComputerID(), e);
// Tear down the computer immediately. There's no guarantee it's well-behaved from now on.
executor.fastFail();
executor.worker.abortWithError();
} finally {
var thisExecutor = currentExecutor.getAndSet(null);
if (thisExecutor != null) afterWork(this, executor);
if (thisExecutor != null) afterWork(executor);
}
}
}
private void reportTimeout(ComputerExecutor executor, long time) {
private void reportTimeout(ExecutorImpl executor, long time) {
if (!LOG.isErrorEnabled(Logging.COMPUTER_ERROR)) return;
// Attempt to debounce stack trace reporting, limiting ourselves to one every second. There's no need to be
@ -649,8 +650,8 @@ private void reportTimeout(ComputerExecutor executor, long time) {
var owner = Objects.requireNonNull(this.owner);
var builder = new StringBuilder()
.append("Terminating computer #").append(executor.getComputer().getID())
.append(" due to timeout (running for ").append(time * 1e-9)
.append("Terminating computer #").append(executor.worker.getComputerID())
.append(" due to timeout (ran over by ").append(time * -1e-9)
.append(" seconds). This is NOT a bug, but may mean a computer is misbehaving.\n")
.append("Thread ")
.append(owner.getName())
@ -664,9 +665,150 @@ private void reportTimeout(ComputerExecutor executor, long time) {
builder.append(" at ").append(element).append('\n');
}
executor.printState(builder);
executor.worker.writeState(builder);
LOG.warn(builder.toString());
}
}
/**
* The current state of a {@link ExecutorState}.
* <p>
* Executors are either enqueued (have more work to do) or not and working or not. This enum encapsulates the four
* combinations of these properties, with the following transitions:
*
* <pre>{@code
* submit() afterWork()
* IDLE ---------> ON_QUEUE <----------- REPEAT
* ^ | ^
* | | runImpl() |
* | V |
* +---------------RUNNING----------------+
* afterWork() submit()
* }</pre>
*/
enum ExecutorState {
/**
* This executor is idle.
*/
IDLE,
/**
* This executor is on the queue but idle.
*/
ON_QUEUE,
/**
* This executor is running and will transition to idle after execution.
*/
RUNNING,
/**
* This executor is running and should run again after this task finishes.
*/
REPEAT;
ExecutorState enqueue() {
return switch (this) {
case IDLE, ON_QUEUE -> ON_QUEUE;
case RUNNING, REPEAT -> REPEAT;
};
}
ExecutorState requeue() {
return switch (this) {
case IDLE, ON_QUEUE -> {
assert false : "Impossible state after executing";
LOG.error("Impossible state - calling requeue with {}.", this);
yield ExecutorState.ON_QUEUE;
}
case RUNNING -> ExecutorState.IDLE;
case REPEAT -> ExecutorState.ON_QUEUE;
};
}
}
private final class ExecutorImpl implements Executor {
public static final AtomicReferenceFieldUpdater<ExecutorImpl, ExecutorState> STATE = AtomicReferenceFieldUpdater.newUpdater(
ExecutorImpl.class, ExecutorState.class, "$state"
);
final Worker worker;
private final MetricsObserver metrics;
final TimeoutImpl timeout;
/**
* The current state of this worker.
*/
private volatile ExecutorState $state = ExecutorState.IDLE;
/**
* The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers.
*
* @see ComputerThread
*/
long virtualRuntime = 0;
/**
* The last time at which we updated {@link #virtualRuntime}.
*
* @see ComputerThread
*/
long vRuntimeStart;
ExecutorImpl(Worker worker, MetricsObserver metrics) {
this.worker = worker;
this.metrics = metrics;
timeout = new TimeoutImpl();
}
/**
* Called before calling {@link Worker#work()}, setting up any important state.
*/
void beforeWork() {
vRuntimeStart = System.nanoTime();
timeout.startTimer(scaledPeriod());
}
/**
* Called after executing {@link Worker#work()}.
*
* @return If we have more work to do.
*/
boolean afterWork() {
timeout.reset();
metrics.observe(Metrics.COMPUTER_TASKS, timeout.getExecutionTime());
var state = STATE.getAndUpdate(this, ExecutorState::requeue);
return state == ExecutorState.REPEAT;
}
@Override
public void submit() {
var state = STATE.getAndUpdate(this, ExecutorState::enqueue);
if (state == ExecutorState.IDLE) queue(this);
}
@Override
public TimeoutState timeoutState() {
return timeout;
}
@Override
public long getRemainingTime() {
return timeout.getRemainingTime();
}
@Override
public void setRemainingTime(long time) {
timeout.setRemainingTime(time);
}
}
private final class TimeoutImpl extends ManagedTimeoutState {
@Override
protected boolean shouldPause() {
return hasPendingWork();
}
}
}

View File

@ -0,0 +1,127 @@
// SPDX-FileCopyrightText: 2019 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.computer.TimeoutState;
/**
* A basic {@link TimeoutState} implementation, for use by {@link ComputerScheduler} implementations.
* <p>
* While almost all {@link TimeoutState} implementations will be derived from this class, the two are intentionally kept
* separate. This class is intended for the {@link ComputerScheduler} (which is responsible for controlling the
* timeout), and not for the main computer logic, which only needs to check timeout flags.
* <p>
* This class tracks the time a computer was started (and thus {@linkplain #getExecutionTime()} how long it has been
* running for), as well as the deadline for when a computer should be soft aborted and paused.
*/
public abstract class ManagedTimeoutState extends TimeoutState {
/**
* When execution of this computer started.
*
* @see #getExecutionTime()
*/
private long startTime;
/**
* The time when this computer should be aborted.
*
* @see #getRemainingTime()
* @see #setRemainingTime(long)
*/
private long abortDeadline;
/**
* The time when this computer should be paused if {@link ComputerThread#hasPendingWork()} is set.
*/
private long pauseDeadline;
@Override
public final synchronized void refresh() {
// Important: The weird arithmetic here is important, as nanoTime may return negative values, and so we
// need to handle overflow.
var now = System.nanoTime();
var changed = false;
if (!paused && Long.compareUnsigned(now, pauseDeadline) >= 0 && shouldPause()) { // now >= currentDeadline
paused = true;
changed = true;
}
if (!softAbort && Long.compareUnsigned(now, abortDeadline) >= 0) { // now >= currentAbort
softAbort = true;
changed = true;
}
if (softAbort && !hardAbort && Long.compareUnsigned(now, abortDeadline + ABORT_TIMEOUT) >= 0) { // now >= currentAbort + ABORT_TIMEOUT.
hardAbort = true;
changed = true;
}
if (changed) updateListeners();
}
/**
* Get how long this computer has been executing for.
*
* @return How long the computer has been running for in nanoseconds.
*/
public final long getExecutionTime() {
return System.nanoTime() - startTime;
}
/**
* Get how long this computer is permitted to run before being aborted.
*
* @return The remaining time, in nanoseconds.
* @see ComputerScheduler.Executor#getRemainingTime()
*/
public final long getRemainingTime() {
return abortDeadline - System.nanoTime();
}
/**
* Set how long this computer is permitted to run before being aborted.
*
* @param time The remaining time, in nanoseconds.
* @see ComputerScheduler.Executor#setRemainingTime(long)
*/
public final void setRemainingTime(long time) {
abortDeadline = startTime + time;
}
/**
* Set the hard-abort flag immediately.
*/
public final void hardAbort() {
softAbort = hardAbort = true;
synchronized (this) {
updateListeners();
}
}
/**
* Start this timer, recording the current start time, and deadline before a computer may be paused.
*
* @param pauseTimeout The minimum time this computer can run before potentially being paused.
*/
public final synchronized void startTimer(long pauseTimeout) {
var now = System.nanoTime();
startTime = now;
abortDeadline = now + BASE_TIMEOUT;
pauseDeadline = now + pauseTimeout;
}
/**
* Clear the paused and abort flags.
*/
public final synchronized void reset() {
paused = softAbort = hardAbort = false;
updateListeners();
}
/**
* Determine if this computer should be paused, as other computers are contending for work.
*
* @return If this computer should be paused.
*/
protected abstract boolean shouldPause();
}

View File

@ -0,0 +1,142 @@
// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.MetricsObserver;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.concurrent.GuardedBy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
public class ComputerThreadRunner implements AutoCloseable {
private final ComputerThread thread;
private final Lock errorLock = new ReentrantLock();
private final @GuardedBy("errorLock") Condition hasError = errorLock.newCondition();
@GuardedBy("errorLock")
private @MonotonicNonNull Throwable error = null;
public ComputerThreadRunner() {
this.thread = new ComputerThread(1);
}
public ComputerThread thread() {
return thread;
}
@Override
public void close() {
try {
if (!thread.stop(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("Failed to shutdown ComputerContext in time.");
}
} catch (InterruptedException e) {
throw new IllegalStateException("Runtime thread was interrupted", e);
}
}
public Worker createWorker(BiConsumer<ComputerScheduler.Executor, TimeoutState> action) {
return new Worker(thread, e -> action.accept(e, e.timeoutState()));
}
public void createLoopingComputer() {
new Worker(thread, e -> {
Thread.sleep(100);
e.submit();
}).executor().submit();
}
public void startAndWait(Worker worker) throws Exception {
worker.executor().submit();
do {
errorLock.lock();
try {
rethrowIfNeeded();
if (hasError.await(100, TimeUnit.MILLISECONDS)) rethrowIfNeeded();
} finally {
errorLock.unlock();
}
} while (worker.executed == 0 || thread.hasPendingWork());
}
@GuardedBy("errorLock")
private void rethrowIfNeeded() throws Exception {
if (error != null) ComputerThreadRunner.<Exception>throwUnchecked0(error);
}
@SuppressWarnings("unchecked")
private static <T extends Throwable> void throwUnchecked0(Throwable t) throws T {
throw (T) t;
}
@FunctionalInterface
private interface Task {
void run(ComputerScheduler.Executor executor) throws InterruptedException;
}
public final class Worker implements ComputerScheduler.Worker {
private final Task run;
private final ComputerScheduler.Executor executor;
volatile int executed = 0;
private Worker(ComputerScheduler scheduler, Task run) {
this.run = run;
this.executor = scheduler.createExecutor(this, MetricsObserver.discard());
}
public ComputerScheduler.Executor executor() {
return executor;
}
@Override
public void work() {
try {
run.run(executor);
executed++;
} catch (Throwable e) {
errorLock.lock();
try {
if (error == null) {
error = e;
hasError.signal();
} else {
error.addSuppressed(e);
}
} finally {
errorLock.unlock();
}
if (e instanceof Exception || e instanceof AssertionError) return;
throwUnchecked0(e);
}
}
@Override
public int getComputerID() {
return 0;
}
@Override
public void writeState(StringBuilder output) {
}
@Override
public void abortWithTimeout() {
}
@Override
public void unload() {
}
@Override
public void abortWithError() {
}
}
}

View File

@ -4,9 +4,8 @@
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.lua.MachineResult;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.test.core.ConcurrentHelpers;
import dan200.computercraft.test.core.computer.KotlinComputerManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -28,11 +27,11 @@
@Execution(ExecutionMode.CONCURRENT)
public class ComputerThreadTest {
private static final Logger LOG = LoggerFactory.getLogger(ComputerThreadTest.class);
private KotlinComputerManager manager;
private ComputerThreadRunner manager;
@BeforeEach
public void before() {
manager = new KotlinComputerManager();
manager = new ComputerThreadRunner();
}
@AfterEach
@ -42,16 +41,13 @@ public void after() {
@Test
public void testSoftAbort() throws Exception {
var computer = manager.create();
manager.enqueue(computer, timeout -> {
var computer = manager.createWorker((executor, timeout) -> {
executor.setRemainingTime(TimeoutState.TIMEOUT);
assertFalse(timeout.isSoftAborted(), "Should not start soft-aborted");
var delay = ConcurrentHelpers.waitUntil(timeout::isSoftAborted);
assertThat("Should be soft aborted", delay * 1e-9, closeTo(7, 1.0));
LOG.info("Slept for {}", delay);
computer.shutdown();
return MachineResult.OK;
});
manager.startAndWait(computer);
@ -59,15 +55,12 @@ public void testSoftAbort() throws Exception {
@Test
public void testHardAbort() throws Exception {
var computer = manager.create();
manager.enqueue(computer, timeout -> {
var computer = manager.createWorker((executor, timeout) -> {
executor.setRemainingTime(TimeoutState.TIMEOUT);
assertFalse(timeout.isHardAborted(), "Should not start soft-aborted");
assertThrows(InterruptedException.class, () -> Thread.sleep(11_000), "Sleep should be hard aborted");
assertTrue(timeout.isHardAborted(), "Thread should be hard aborted");
computer.shutdown();
return MachineResult.OK;
});
manager.startAndWait(computer);
@ -75,13 +68,9 @@ public void testHardAbort() throws Exception {
@Test
public void testNoPauseIfNoOtherMachines() throws Exception {
var computer = manager.create();
manager.enqueue(computer, timeout -> {
var computer = manager.createWorker((executor, timeout) -> {
var didPause = ConcurrentHelpers.waitUntil(timeout::isPaused, 5, TimeUnit.SECONDS);
assertFalse(didPause, "Machine shouldn't have paused within 5s");
computer.shutdown();
return MachineResult.OK;
});
manager.startAndWait(computer);
@ -89,18 +78,14 @@ public void testNoPauseIfNoOtherMachines() throws Exception {
@Test
public void testPauseIfSomeOtherMachine() throws Exception {
var computer = manager.create();
manager.enqueue(computer, timeout -> {
var budget = manager.context().computerScheduler().scaledPeriod();
var computer = manager.createWorker((executor, timeout) -> {
var budget = manager.thread().scaledPeriod();
assertEquals(budget, TimeUnit.MILLISECONDS.toNanos(25), "Budget should be 25ms");
var delay = ConcurrentHelpers.waitUntil(timeout::isPaused);
// Linux appears to have much more accurate timing than Windows/OSX. Or at least on CI!
var time = System.getProperty("os.name", "").toLowerCase(Locale.ROOT).contains("linux") ? 0.05 : 0.3;
assertThat("Paused within a short time", delay * 1e-9, lessThanOrEqualTo(time));
computer.shutdown();
return MachineResult.OK;
});
manager.createLoopingComputer();

View File

@ -1,189 +0,0 @@
// SPDX-FileCopyrightText: 2022 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.test.core.computer
import dan200.computercraft.api.lua.ILuaAPI
import dan200.computercraft.core.ComputerContext
import dan200.computercraft.core.computer.Computer
import dan200.computercraft.core.computer.TimeoutState
import dan200.computercraft.core.lua.MachineEnvironment
import dan200.computercraft.core.lua.MachineResult
import dan200.computercraft.core.terminal.Terminal
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
typealias FakeComputerTask = (state: TimeoutState) -> MachineResult
/**
* Creates "fake" computers, which just run user-defined tasks rather than Lua code.
*/
class KotlinComputerManager : AutoCloseable {
private val machines: MutableMap<Computer, Queue<FakeComputerTask>> = HashMap()
private val context = ComputerContext.builder(BasicEnvironment())
.luaFactory { env, _ -> DummyLuaMachine(env) }
.build()
private val errorLock: Lock = ReentrantLock()
private val hasError = errorLock.newCondition()
@Volatile
private var error: Throwable? = null
override fun close() {
try {
context.ensureClosed(10, TimeUnit.SECONDS)
} catch (e: InterruptedException) {
throw IllegalStateException("Runtime thread was interrupted", e)
}
}
fun context(): ComputerContext {
return context
}
/**
* Create a new computer which pulls from our task queue.
*
* @return The computer. This will not be started yet, you must call [Computer.turnOn] and
* [Computer.tick] to do so.
*/
fun create(): Computer {
val queue: Queue<FakeComputerTask> = ConcurrentLinkedQueue()
val computer = Computer(
context,
BasicEnvironment(),
Terminal(51, 19, true),
0,
)
computer.addApi(QueuePassingAPI(queue)) // Inject an extra API to pass the queue to the machine.
machines[computer] = queue
return computer
}
/**
* Create and start a new computer which loops forever.
*/
fun createLoopingComputer() {
val computer = create()
enqueueForever(computer) {
Thread.sleep(100)
MachineResult.OK
}
computer.turnOn()
computer.tick()
}
/**
* Enqueue a task on a computer.
*
* @param computer The computer to enqueue the work on.
* @param task The task to run.
*/
fun enqueue(computer: Computer, task: FakeComputerTask) {
machines[computer]!!.offer(task)
}
/**
* Enqueue a repeated task on a computer. This is automatically requeued when the task finishes, meaning the task
* queue is never empty.
*
* @param computer The computer to enqueue the work on.
* @param task The task to run.
*/
private fun enqueueForever(computer: Computer, task: FakeComputerTask) {
machines[computer]!!.offer {
val result = task(it)
enqueueForever(computer, task)
computer.queueEvent("some_event", null)
result
}
}
/**
* Sleep for a given period, immediately propagating any exceptions thrown by a computer.
*
* @param delay The duration to sleep for.
* @param unit The time unit the duration is measured in.
* @throws Exception An exception thrown by a running computer.
*/
@Throws(Exception::class)
fun sleep(delay: Long, unit: TimeUnit?) {
errorLock.lock()
try {
rethrowIfNeeded()
if (hasError.await(delay, unit)) rethrowIfNeeded()
} finally {
errorLock.unlock()
}
}
/**
* Start a computer and wait for it to finish.
*
* @param computer The computer to wait for.
* @throws Exception An exception thrown by a running computer.
*/
@Throws(Exception::class)
fun startAndWait(computer: Computer) {
computer.turnOn()
computer.tick()
do {
sleep(100, TimeUnit.MILLISECONDS)
} while (context.computerScheduler().hasPendingWork() || computer.isOn)
rethrowIfNeeded()
}
@Throws(Exception::class)
private fun rethrowIfNeeded() {
val error = error ?: return
throw error
}
private class QueuePassingAPI constructor(val tasks: Queue<FakeComputerTask>) : ILuaAPI {
override fun getNames(): Array<String> = arrayOf()
}
private inner class DummyLuaMachine(private val environment: MachineEnvironment) : KotlinLuaMachine(environment) {
private val tasks: Queue<FakeComputerTask> =
environment.apis.asSequence().filterIsInstance(QueuePassingAPI::class.java).first().tasks
override fun getTask(): (suspend KotlinLuaMachine.() -> Unit)? {
try {
val task = tasks.remove()
return {
try {
task(environment.timeout)
} catch (e: Throwable) {
reportError(e)
}
}
} catch (e: Throwable) {
reportError(e)
return null
}
}
override fun close() {}
private fun reportError(e: Throwable) {
errorLock.lock()
try {
if (error == null) {
error = e
hasError.signal()
} else {
error!!.addSuppressed(e)
}
} finally {
errorLock.unlock()
}
if (e is Exception || e is AssertionError) return else throw e
}
}
}

View File

@ -5,57 +5,110 @@
package dan200.computercraft.core.computer.computerthread;
import cc.tweaked.web.js.Callbacks;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.MetricsObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teavm.jso.browser.TimerHandler;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
/**
* A reimplementation of {@link ComputerThread} which, well, avoids any threading!
* An implementation of {@link ComputerScheduler} which executes work as soon as possible via
* {@link Callbacks#setImmediate(TimerHandler)}.
* <p>
* This instead just exucutes work as soon as possible via {@link Callbacks#setImmediate(TimerHandler)}. Timeouts are
* instead handled via polling, see {@link cc.tweaked.web.builder.PatchCobalt}.
* Timeouts are instead handled via polling, see {@link cc.tweaked.web.builder.PatchCobalt}.
*
* @see ComputerThread
*/
public class TComputerThread {
private static final ArrayDeque<ComputerExecutor> executors = new ArrayDeque<>();
private final TimerHandler callback = this::workOnce;
public class TComputerThread implements ComputerScheduler {
private static final Logger LOG = LoggerFactory.getLogger(TComputerThread.class);
private static final long SCALED_PERIOD = 50 * 1_000_000L;
private static final ArrayDeque<ExecutorImpl> executors = new ArrayDeque<>();
private static final TimerHandler callback = TComputerThread::workOnce;
public TComputerThread(int threads) {
}
public void queue(ComputerExecutor executor) {
if (executor.onComputerQueue) throw new IllegalStateException("Cannot queue already queued executor");
executor.onComputerQueue = true;
if (executors.isEmpty()) Callbacks.setImmediate(callback);
executors.add(executor);
@Override
public Executor createExecutor(Worker worker, MetricsObserver metrics) {
return new ExecutorImpl(worker);
}
private void workOnce() {
private static void workOnce() {
var executor = executors.poll();
if (executor == null) throw new IllegalStateException("Working, but executor is null");
if (!executor.onComputerQueue) throw new IllegalArgumentException("Working but not on queue");
executor.beforeWork();
try {
executor.work();
executor.worker.work();
} catch (Exception e) {
e.printStackTrace();
LOG.error("Error running computer", e);
executor.worker.abortWithError();
}
executor.afterWork();
if (executor.afterWork()) executors.push(executor);
if (!executors.isEmpty()) Callbacks.setImmediate(callback);
}
public boolean hasPendingWork() {
return true;
}
public long scaledPeriod() {
return 50 * 1_000_000L;
}
@Override
public boolean stop(long timeout, TimeUnit unit) {
return true;
}
/**
* The {@link Executor} for our scheduler.
*/
private static final class ExecutorImpl implements ComputerScheduler.Executor {
final ComputerScheduler.Worker worker;
private final TimeoutImpl timeout = new TimeoutImpl();
private boolean onQueue;
private ExecutorImpl(Worker worker) {
this.worker = worker;
}
@Override
public void submit() {
if (onQueue) return;
onQueue = true;
if (executors.isEmpty()) Callbacks.setImmediate(callback);
executors.add(this);
}
void beforeWork() {
if (!onQueue) throw new IllegalArgumentException("Working but not on queue");
onQueue = false;
timeout.startTimer(SCALED_PERIOD);
}
void afterWork() {
timeout.reset();
}
@Override
public TimeoutState timeoutState() {
return timeout;
}
@Override
public long getRemainingTime() {
return timeout.getRemainingTime();
}
@Override
public void setRemainingTime(long time) {
timeout.setRemainingTime(time);
}
}
private static final class TimeoutImpl extends ManagedTimeoutState {
@Override
protected boolean shouldPause() {
return true;
}
}
}