Merge branch 'feature/split-computer-thread' into mc-1.20.x

This commit is contained in:
Jonathan Coates 2023-10-19 22:54:56 +01:00
commit e3ced84885
No known key found for this signature in database
GPG Key ID: B9E431FF07C98D06
12 changed files with 891 additions and 597 deletions

View File

@ -112,7 +112,9 @@ SPDX-License-Identifier: MPL-2.0
<module name="LambdaParameterName" />
<module name="LocalFinalVariableName" />
<module name="LocalVariableName" />
<module name="MemberName" />
<module name="MemberName">
<property name="format" value="^\$?[a-z][a-zA-Z0-9]*$" />
</module>
<module name="MethodName">
<property name="format" value="^(computercraft\$)?[a-z][a-zA-Z0-9]*$" />
</module>
@ -122,7 +124,7 @@ SPDX-License-Identifier: MPL-2.0
</module>
<module name="ParameterName" />
<module name="StaticVariableName">
<property name="format" value="^[a-z][a-zA-Z0-9]*|CAPABILITY(_[A-Z_]+)?$" />
<property name="format" value="^[a-z][a-zA-Z0-9]*$" />
</module>
<module name="TypeName" />

View File

@ -8,8 +8,9 @@
import dan200.computercraft.core.asm.GenericMethod;
import dan200.computercraft.core.asm.LuaMethodSupplier;
import dan200.computercraft.core.asm.PeripheralMethodSupplier;
import dan200.computercraft.core.computer.ComputerThread;
import dan200.computercraft.core.computer.GlobalEnvironment;
import dan200.computercraft.core.computer.computerthread.ComputerScheduler;
import dan200.computercraft.core.computer.computerthread.ComputerThread;
import dan200.computercraft.core.computer.mainthread.MainThreadScheduler;
import dan200.computercraft.core.computer.mainthread.NoWorkMainThreadScheduler;
import dan200.computercraft.core.lua.CobaltLuaMachine;
@ -31,7 +32,7 @@
*/
public final class ComputerContext {
private final GlobalEnvironment globalEnvironment;
private final ComputerThread computerScheduler;
private final ComputerScheduler computerScheduler;
private final MainThreadScheduler mainThreadScheduler;
private final ILuaMachine.Factory luaFactory;
private final List<ILuaAPIFactory> apiFactories;
@ -39,7 +40,7 @@ public final class ComputerContext {
private final MethodSupplier<PeripheralMethod> peripheralMethods;
ComputerContext(
GlobalEnvironment globalEnvironment, ComputerThread computerScheduler,
GlobalEnvironment globalEnvironment, ComputerScheduler computerScheduler,
MainThreadScheduler mainThreadScheduler, ILuaMachine.Factory luaFactory,
List<ILuaAPIFactory> apiFactories, MethodSupplier<LuaMethod> luaMethods,
MethodSupplier<PeripheralMethod> peripheralMethods
@ -68,7 +69,7 @@ public GlobalEnvironment globalEnvironment() {
*
* @return The current computer thread manager.
*/
public ComputerThread computerScheduler() {
public ComputerScheduler computerScheduler() {
return computerScheduler;
}
@ -162,7 +163,7 @@ public static Builder builder(GlobalEnvironment environment) {
*/
public static class Builder {
private final GlobalEnvironment environment;
private int threads = 1;
private @Nullable ComputerScheduler computerScheduler = null;
private @Nullable MainThreadScheduler mainThreadScheduler;
private @Nullable ILuaMachine.Factory luaFactory;
private @Nullable List<ILuaAPIFactory> apiFactories;
@ -173,7 +174,7 @@ public static class Builder {
}
/**
* Set the number of threads the {@link ComputerThread} will use.
* Set the {@link #computerScheduler()} to use {@link ComputerThread} with a given number of threads.
*
* @param threads The number of threads to use.
* @return {@code this}, for chaining
@ -181,7 +182,20 @@ public static class Builder {
*/
public Builder computerThreads(int threads) {
if (threads < 1) throw new IllegalArgumentException("Threads must be >= 1");
this.threads = threads;
return computerScheduler(new ComputerThread(threads));
}
/**
* Set the {@link ComputerScheduler} for this context.
*
* @param scheduler The computer thread scheduler.
* @return {@code this}, for chaining
* @see ComputerContext#mainThreadScheduler()
*/
public Builder computerScheduler(ComputerScheduler scheduler) {
Objects.requireNonNull(scheduler);
if (computerScheduler != null) throw new IllegalStateException("Computer scheduler already specified");
computerScheduler = scheduler;
return this;
}
@ -250,7 +264,7 @@ public Builder genericMethods(Collection<GenericMethod> genericMethods) {
public ComputerContext build() {
return new ComputerContext(
environment,
new ComputerThread(threads),
computerScheduler == null ? new ComputerThread(1) : computerScheduler,
mainThreadScheduler == null ? new NoWorkMainThreadScheduler() : mainThreadScheduler,
luaFactory == null ? CobaltLuaMachine::new : luaFactory,
apiFactories == null ? List.of() : apiFactories,

View File

@ -10,6 +10,8 @@
import dan200.computercraft.core.ComputerContext;
import dan200.computercraft.core.CoreConfig;
import dan200.computercraft.core.apis.*;
import dan200.computercraft.core.computer.computerthread.ComputerScheduler;
import dan200.computercraft.core.computer.computerthread.ComputerThread;
import dan200.computercraft.core.filesystem.FileSystem;
import dan200.computercraft.core.filesystem.FileSystemException;
import dan200.computercraft.core.lua.ILuaMachine;
@ -17,7 +19,6 @@
import dan200.computercraft.core.lua.MachineException;
import dan200.computercraft.core.methods.LuaMethod;
import dan200.computercraft.core.methods.MethodSupplier;
import dan200.computercraft.core.metrics.Metrics;
import dan200.computercraft.core.metrics.MetricsObserver;
import dan200.computercraft.core.util.Colour;
import dan200.computercraft.core.util.Nullability;
@ -25,13 +26,13 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
/**
@ -54,7 +55,7 @@
* One final responsibility for the executor is calling {@link ILuaAPI#update()} every tick, via the {@link #tick()}
* method. This should only be called when the computer is actually on ({@link #isOn}).
*/
final class ComputerExecutor {
final class ComputerExecutor implements ComputerScheduler.Worker {
private static final Logger LOG = LoggerFactory.getLogger(ComputerExecutor.class);
private static final int QUEUE_LIMIT = 256;
@ -62,9 +63,7 @@ final class ComputerExecutor {
private final ComputerEnvironment computerEnvironment;
private final MetricsObserver metrics;
private final List<ApiWrapper> apis = new ArrayList<>();
private final ComputerThread scheduler;
private final MethodSupplier<LuaMethod> luaMethods;
final TimeoutState timeout;
private @Nullable FileSystem fileSystem;
@ -92,34 +91,11 @@ final class ComputerExecutor {
private final ReentrantLock isOnLock = new ReentrantLock();
/**
* A lock used for any changes to {@link #eventQueue}, {@link #command} or {@link #onComputerQueue}. This will be
* used on the main thread, so locks should be kept as brief as possible.
* A lock used for any changes to {@link #eventQueue} or {@link #command}. This will be used on the main thread,
* so locks should be kept as brief as possible.
*/
private final Object queueLock = new Object();
/**
* Determines if this executor is present within {@link ComputerThread}.
*
* @see #queueLock
* @see #enqueue()
* @see #afterWork()
*/
volatile boolean onComputerQueue = false;
/**
* The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers.
*
* @see ComputerThread
*/
long virtualRuntime = 0;
/**
* The last time at which we updated {@link #virtualRuntime}.
*
* @see ComputerThread
*/
long vRuntimeStart;
/**
* The command that {@link #work()} should execute on the computer thread.
* <p>
@ -129,6 +105,7 @@ final class ComputerExecutor {
* Note, if command is not {@code null}, then some command is scheduled to be executed. Otherwise it is not
* currently in the queue (or is currently being executed).
*/
@GuardedBy("queueLock")
private volatile @Nullable StateCommand command;
/**
@ -136,43 +113,45 @@ final class ComputerExecutor {
* <p>
* Note, this should be empty if this computer is off - it is cleared on shutdown and when turning on again.
*/
@GuardedBy("queueLock")
private final Queue<Event> eventQueue = new ArrayDeque<>(4);
/**
* Whether we interrupted an event and so should resume it instead of executing another task.
* Whether this computer was paused (and so should resume without pulling an event) or not.
*
* @see #timeRemaining
* @see #work()
* @see #resumeMachine(String, Object[])
*/
private boolean interruptedEvent = false;
private boolean wasPaused;
/**
* The amount of time this computer can run for before being interrupted. This is only defined when
* {@link #wasPaused} is set.
*/
private long timeRemaining = 0;
/**
* Whether this executor has been closed, and will no longer accept any incoming commands or events.
*
* @see #queueStop(boolean, boolean)
*/
@GuardedBy("queueLock")
private boolean closed;
private @Nullable WritableMount rootMount;
/**
* The thread the executor is running on. This is non-null when performing work. We use this to ensure we're only
* doing one bit of work at one time.
*
* @see ComputerThread
*/
final AtomicReference<Thread> executingThread = new AtomicReference<>();
private final ILuaMachine.Factory luaFactory;
private final ComputerScheduler.Executor executor;
ComputerExecutor(Computer computer, ComputerEnvironment computerEnvironment, ComputerContext context) {
this.computer = computer;
this.computerEnvironment = computerEnvironment;
metrics = computerEnvironment.getMetrics();
luaFactory = context.luaFactory();
scheduler = context.computerScheduler();
luaMethods = context.luaMethods();
timeout = new TimeoutState(scheduler);
executor = context.computerScheduler().createExecutor(this, metrics);
var environment = computer.getEnvironment();
@ -192,6 +171,11 @@ final class ComputerExecutor {
}
}
@Override
public int getComputerID() {
return computer.getID();
}
boolean isOn() {
return isOn;
}
@ -202,10 +186,6 @@ FileSystem getFileSystem() {
return fileSystem;
}
Computer getComputer() {
return computer;
}
void addApi(ILuaAPI api) {
apis.add(new ApiWrapper(api, null));
}
@ -219,8 +199,8 @@ void queueStart() {
if (closed || isOn || command != null) return;
command = StateCommand.TURN_ON;
enqueue();
}
enqueue();
}
/**
@ -245,24 +225,31 @@ void queueStop(boolean reboot, boolean close) {
}
command = newCommand;
enqueue();
}
enqueue();
}
/**
* Abort this whole computer due to a timeout. This will immediately destroy the Lua machine,
* and then schedule a shutdown.
*/
void abort() {
immediateFail(StateCommand.ABORT);
@Override
public void abortWithTimeout() {
immediateFail(StateCommand.ABORT_WITH_TIMEOUT);
}
/**
* Abort this whole computer due to an internal error. This will immediately destroy the Lua machine,
* and then schedule a shutdown.
*/
void fastFail() {
immediateFail(StateCommand.ERROR);
@Override
public void abortWithError() {
immediateFail(StateCommand.ABORT_WITH_ERROR);
}
@Override
public void unload() {
queueStop(false, true);
}
private void immediateFail(StateCommand command) {
@ -293,17 +280,15 @@ void queueEvent(String event, @Nullable Object[] args) {
if (closed || command != null || eventQueue.size() >= QUEUE_LIMIT) return;
eventQueue.offer(new Event(event, args));
enqueue();
}
enqueue();
}
/**
* Add this executor to the {@link ComputerThread} if not already there.
*/
private void enqueue() {
synchronized (queueLock) {
if (!onComputerQueue) scheduler.queue(this);
}
executor.submit();
}
/**
@ -384,7 +369,7 @@ private ILuaMachine createLuaMachine() {
// Create the lua machine
try (var bios = biosStream) {
return luaFactory.create(new MachineEnvironment(
new LuaContext(computer), metrics, timeout,
new LuaContext(computer), metrics, executor.timeoutState(),
() -> apis.stream().map(ApiWrapper::api).iterator(),
luaMethods,
computer.getGlobalEnvironment().getHostString()
@ -404,7 +389,6 @@ private void turnOn() throws InterruptedException {
try {
// Reset the terminal and event queue
computer.getTerminal().reset();
interruptedEvent = false;
synchronized (queueLock) {
eventQueue.clear();
}
@ -432,15 +416,15 @@ private void turnOn() throws InterruptedException {
isOnLock.unlock();
}
// Now actually start the computer, now that everything is set up.
resumeMachine(null, null);
// Mark the Lua VM as ready to be executed next time.
wasPaused = true;
timeRemaining = TimeoutState.TIMEOUT;
}
private void shutdown() throws InterruptedException {
isOnLock.lockInterruptibly();
try {
isOn = false;
interruptedEvent = false;
isOn = wasPaused = false;
synchronized (queueLock) {
eventQueue.clear();
}
@ -468,36 +452,6 @@ private void shutdown() throws InterruptedException {
}
}
/**
* Called before calling {@link #work()}, setting up any important state.
*/
void beforeWork() {
vRuntimeStart = System.nanoTime();
timeout.startTimer();
}
/**
* Called after executing {@link #work()}.
*
* @return If we have more work to do.
*/
boolean afterWork() {
if (interruptedEvent) {
timeout.pauseTimer();
} else {
timeout.stopTimer();
}
metrics.observe(Metrics.COMPUTER_TASKS, timeout.nanoCurrent());
if (interruptedEvent) return true;
synchronized (queueLock) {
if (eventQueue.isEmpty() && command == null) return onComputerQueue = false;
return true;
}
}
/**
* The main worker function, called by {@link ComputerThread}.
* <p>
@ -507,15 +461,15 @@ boolean afterWork() {
* @see #command
* @see #eventQueue
*/
void work() throws InterruptedException {
if (interruptedEvent && !closed) {
interruptedEvent = false;
if (machine != null) {
resumeMachine(null, null);
return;
}
@Override
public void work() throws InterruptedException {
workImpl();
synchronized (queueLock) {
if (wasPaused || command != null || !eventQueue.isEmpty()) enqueue();
}
}
private void workImpl() throws InterruptedException {
StateCommand command;
Event event = null;
synchronized (queueLock) {
@ -523,7 +477,7 @@ void work() throws InterruptedException {
this.command = null;
// If we've no command, pull something from the event queue instead.
if (command == null) {
if (command == null && !wasPaused) {
if (!isOn) {
// We're not on and had no command, but we had work queued. This should never happen, so clear
// the event queue just in case.
@ -536,6 +490,7 @@ void work() throws InterruptedException {
}
if (command != null) {
wasPaused = false;
switch (command) {
case TURN_ON -> {
if (isOn) return;
@ -552,23 +507,29 @@ void work() throws InterruptedException {
shutdown();
computer.turnOn();
}
case ABORT -> {
case ABORT_WITH_TIMEOUT -> {
if (!isOn) return;
displayFailure("Error running computer", TimeoutState.ABORT_MESSAGE);
shutdown();
}
case ERROR -> {
case ABORT_WITH_ERROR -> {
if (!isOn) return;
displayFailure("Error running computer", "An internal error occurred, see logs.");
shutdown();
}
}
} else if (wasPaused) {
executor.setRemainingTime(timeRemaining);
resumeMachine(null, null);
} else if (event != null) {
executor.setRemainingTime(TimeoutState.TIMEOUT);
resumeMachine(event.name, event.args);
}
}
void printState(StringBuilder out) {
@Override
@SuppressWarnings("GuardedBy")
public void writeState(StringBuilder out) {
out.append("Enqueued command: ").append(command).append('\n');
out.append("Enqueued events: ").append(eventQueue.size()).append('\n');
@ -599,19 +560,23 @@ private void displayFailure(String message, @Nullable String extra) {
private void resumeMachine(@Nullable String event, @Nullable Object[] args) throws InterruptedException {
var result = Nullability.assertNonNull(machine).handleEvent(event, args);
interruptedEvent = result.isPause();
if (!result.isError()) return;
displayFailure("Error running computer", result.getMessage());
shutdown();
if (result.isError()) {
displayFailure("Error running computer", result.getMessage());
shutdown();
} else if (result.isPause()) {
wasPaused = true;
timeRemaining = executor.getRemainingTime();
} else {
wasPaused = false;
}
}
private enum StateCommand {
TURN_ON,
SHUTDOWN,
REBOOT,
ABORT,
ERROR,
ABORT_WITH_TIMEOUT,
ABORT_WITH_ERROR,
}
private record Event(String name, @Nullable Object[] args) {

View File

@ -5,6 +5,8 @@
package dan200.computercraft.core.computer;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import dan200.computercraft.core.computer.computerthread.ComputerScheduler;
import dan200.computercraft.core.computer.computerthread.ManagedTimeoutState;
import dan200.computercraft.core.lua.ILuaMachine;
import dan200.computercraft.core.lua.MachineResult;
@ -24,90 +26,54 @@
* (namely, throwing a "Too long without yielding" error).
* <p>
* Now, if a computer still does not stop after that period, they're behaving really badly. 1.5 seconds after a soft
* abort ({@link #ABORT_TIMEOUT}), we trigger a hard abort (note, this is done from the computer thread manager). This
* will destroy the entire Lua runtime and shut the computer down.
* abort ({@link #ABORT_TIMEOUT}), we trigger a hard abort. This will destroy the entire Lua runtime and shut the
* computer down.
* <p>
* The Lua runtime is also allowed to pause execution if there are other computers contesting for work. All computers
* are allowed to run for {@link ComputerThread#scaledPeriod()} nanoseconds (see {@link #currentDeadline}). After that
* period, if any computers are waiting to be executed then we'll set the paused flag to true ({@link #isPaused()}.
* are guaranteed to run for some time. After that period, if any computers are waiting to be executed then we'll set
* the paused flag to true ({@link #isPaused()}.
*
* @see ComputerThread
* @see ComputerScheduler
* @see ManagedTimeoutState
* @see ILuaMachine
* @see MachineResult#isPause()
*/
public final class TimeoutState {
public abstract class TimeoutState {
/**
* The total time a task is allowed to run before aborting in nanoseconds.
* The time (in nanoseconds) are computer is allowed to run for its long-running tasks, such as startup and
* shutdown.
*/
static final long TIMEOUT = TimeUnit.MILLISECONDS.toNanos(7000);
public static final long BASE_TIMEOUT = TimeUnit.SECONDS.toNanos(30);
/**
* The total time the Lua VM is allowed to run before aborting in nanoseconds.
*/
public static final long TIMEOUT = TimeUnit.MILLISECONDS.toNanos(7000);
/**
* The time the task is allowed to run after each abort in nanoseconds.
*/
static final long ABORT_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(1500);
public static final long ABORT_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(1500);
/**
* The error message to display when we trigger an abort.
*/
public static final String ABORT_MESSAGE = "Too long without yielding";
private final ComputerThread scheduler;
@GuardedBy("this")
private final List<Runnable> listeners = new ArrayList<>(0);
private boolean paused;
private boolean softAbort;
private volatile boolean hardAbort;
/**
* When the cumulative time would have started had the whole event been processed in one go.
*/
private long cumulativeStart;
/**
* How much cumulative time has elapsed. This is effectively {@code cumulativeStart - currentStart}.
*/
private long cumulativeElapsed;
/**
* When this execution round started.
*/
private long currentStart;
/**
* When this execution round should look potentially be paused.
*/
private long currentDeadline;
public TimeoutState(ComputerThread scheduler) {
this.scheduler = scheduler;
}
long nanoCumulative() {
return System.nanoTime() - cumulativeStart;
}
long nanoCurrent() {
return System.nanoTime() - currentStart;
}
protected boolean paused;
protected boolean softAbort;
protected volatile boolean hardAbort;
/**
* Recompute the {@link #isSoftAborted()} and {@link #isPaused()} flags.
* <p>
* Normally this will be called automatically by the {@link ComputerScheduler}, but it may be useful to call this
* manually if the most up-to-date information is needed.
*/
public synchronized void refresh() {
// Important: The weird arithmetic here is important, as nanoTime may return negative values, and so we
// need to handle overflow.
var now = System.nanoTime();
var changed = false;
if (!paused && (paused = currentDeadline - now <= 0 && scheduler.hasPendingWork())) { // now >= currentDeadline
changed = true;
}
if (!softAbort && (softAbort = now - cumulativeStart - TIMEOUT >= 0)) { // now - cumulativeStart >= TIMEOUT
changed = true;
}
if (changed) updateListeners();
}
public abstract void refresh();
/**
* Whether we should pause execution of this machine.
@ -117,7 +83,7 @@ public synchronized void refresh() {
*
* @return Whether we should pause execution.
*/
public boolean isPaused() {
public final boolean isPaused() {
return paused;
}
@ -126,7 +92,7 @@ public boolean isPaused() {
*
* @return {@code true} if we should throw a timeout error.
*/
public boolean isSoftAborted() {
public final boolean isSoftAborted() {
return softAbort;
}
@ -135,64 +101,22 @@ public boolean isSoftAborted() {
*
* @return {@code true} if the machine should be forcibly shut down.
*/
public boolean isHardAborted() {
public final boolean isHardAborted() {
return hardAbort;
}
/**
* If the machine should be forcibly aborted.
*/
void hardAbort() {
softAbort = hardAbort = true;
synchronized (this) {
updateListeners();
}
}
/**
* Start the current and cumulative timers again.
*/
void startTimer() {
var now = System.nanoTime();
currentStart = now;
currentDeadline = now + scheduler.scaledPeriod();
// Compute the "nominal start time".
cumulativeStart = now - cumulativeElapsed;
}
/**
* Pauses the cumulative time, to be resumed by {@link #startTimer()}.
*
* @see #nanoCumulative()
*/
synchronized void pauseTimer() {
// We set the cumulative time to difference between current time and "nominal start time".
cumulativeElapsed = System.nanoTime() - cumulativeStart;
paused = false;
updateListeners();
}
/**
* Resets the cumulative time and resets the abort flags.
*/
synchronized void stopTimer() {
cumulativeElapsed = 0;
paused = softAbort = hardAbort = false;
updateListeners();
}
@GuardedBy("this")
private void updateListeners() {
protected final void updateListeners() {
for (var listener : listeners) listener.run();
}
public synchronized void addListener(Runnable listener) {
public final synchronized void addListener(Runnable listener) {
Objects.requireNonNull(listener, "listener cannot be null");
listeners.add(listener);
listener.run();
}
public synchronized void removeListener(Runnable listener) {
public final synchronized void removeListener(Runnable listener) {
Objects.requireNonNull(listener, "listener cannot be null");
listeners.remove(listener);
}

View File

@ -0,0 +1,127 @@
// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.MetricsObserver;
import java.util.concurrent.TimeUnit;
/**
* The {@link ComputerScheduler} is responsible for executing computers on the computer thread(s).
* <p>
* This handles both scheduling the computers for work across multiple threads, as well as {@linkplain TimeoutState timing out}
* or pausing the computer if they execute for too long.
* <p>
* This API is composed of two interfaces, a {@link Worker} and {@link Executor}. The {@link ComputerScheduler}
* implementation will supply an {@link Executor}, while consuming classes should implement {@link Worker}.
* <p>
* In practice, this interface is only implemented by {@link ComputerThread} (and consumed by {@link dan200.computercraft.core.computer.ComputerExecutor}),
* however this interface is useful to enforce separation of the two.
*
* @see ManagedTimeoutState
*/
public interface ComputerScheduler {
Executor createExecutor(Worker worker, MetricsObserver metrics);
boolean stop(long timeout, TimeUnit unit) throws InterruptedException;
/**
* The {@link Executor} holds the state of a {@link Worker} within the scheduler.
* <p>
* This is used to schedule the worker for execution, as well as providing some additional control over the
* {@link TimeoutState}.
*/
interface Executor {
/**
* Submit the executor to the scheduler, marking it as ready {@linkplain Worker#work() to run some work}.
* <p>
* This function is idempotent - if the executor is already queued, nothing will happen.
*/
void submit();
/**
* Get the executor's {@link TimeoutState}.
*
* @return The executor's timeout state.
*/
TimeoutState timeoutState();
/**
* Get the amount of time this computer can run for before being interrupted.
* <p>
* This value starts off as {@link TimeoutState#BASE_TIMEOUT}, but may be reduced by
* {@link #setRemainingTime(long)}.
*
* @return The time this computer can run for being interrupted.
* @see #getRemainingTime()
*/
long getRemainingTime();
/**
* Set the amount of this computer can execute for before being interrupted.
* <p>
* This value will typically be {@link TimeoutState#TIMEOUT}, but may be a previous value of
* {@link #getRemainingTime()} if the computer is resuming after {@linkplain TimeoutState#isPaused() being
* paused}.
*
* @param time The time this computer can execute for.
* @see #getRemainingTime()
*/
void setRemainingTime(long time);
}
/**
* A {@link Worker} is responsible for actually running the computer's code.
* <p>
* his handles {@linkplain Worker#work() running the actual computer logic}, as well as providing some additional
* control methods.
* <p>
* This should be implemented by the consuming class.
*/
interface Worker {
/**
* Perform any work that the computer needs to do, for instance turning on, shutting down or actually running
* code.
* <p>
* If the computer needs to run immediately again, it should call {@link Executor#submit()} within this method.
*
* @throws InterruptedException If the computer has run for too long and must be terminated.
*/
void work() throws InterruptedException;
/**
* Get the ID of this computer, used in error messages.
*
* @return This computers ID.
*/
int getComputerID();
/**
* Write any useful debugging information computer to the provided buffer. This is used in log messages when the
* computer has run for too long.
*
* @param output The buffer to write to.
*/
void writeState(StringBuilder output);
/**
* Abort this whole computer due to a timeout.
*/
void abortWithTimeout();
/**
* Abort this whole computer due to some internal error.
*/
void abortWithError();
/**
* "Unload" this computer, shutting it down and preventing it from running again.
* <p>
* This is called by the scheduler when {@linkplain ComputerScheduler#stop(long, TimeUnit) it is stopped.}
*/
void unload();
}
}

View File

@ -2,26 +2,26 @@
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer;
package dan200.computercraft.core.computer.computerthread;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import dan200.computercraft.core.ComputerContext;
import dan200.computercraft.core.Logging;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.Metrics;
import dan200.computercraft.core.metrics.MetricsObserver;
import dan200.computercraft.core.util.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
@ -29,9 +29,9 @@
/**
* Runs all scheduled tasks for computers in a {@link ComputerContext}.
* <p>
* This acts as an over-complicated {@link ThreadPoolExecutor}: It creates several {@link Worker} threads which pull
* tasks from a shared queue, executing them. It also creates a single {@link Monitor} thread, which updates computer
* timeouts, killing workers if they have not been terminated by {@link TimeoutState#isSoftAborted()}.
* This acts as an over-complicated {@link ThreadPoolExecutor}: It creates several {@linkplain WorkerThread worker
* threads} which pull tasks from a shared queue, executing them. It also creates a single {@link Monitor} thread, which
* updates computer timeouts, killing workers if they have not been terminated by {@link TimeoutState#isSoftAborted()}.
* <p>
* Computers are executed using a priority system, with those who have spent less time executing having a higher
* priority than those hogging the thread. This, combined with {@link TimeoutState#isPaused()} means we can reduce the
@ -39,20 +39,16 @@
* <p>
* This is done using an implementation of Linux's Completely Fair Scheduler. When a computer executes, we compute what
* share of execution time it has used (time executed/number of tasks). We then pick the computer who has the least
* "virtual execution time" (aka {@link ComputerExecutor#virtualRuntime}).
* "virtual execution time" (aka {@link ExecutorImpl#virtualRuntime}).
* <p>
* When adding a computer to the queue, we make sure its "virtual runtime" is at least as big as the smallest runtime.
* This means that adding computers which have slept a lot do not then have massive priority over everyone else. See
* {@link #queue(ComputerExecutor)} for how this is implemented.
* {@link #queue(ExecutorImpl)} for how this is implemented.
* <p>
* In reality, it's unlikely that more than a few computers are waiting to execute at once, so this will not have much
* effect unless you have a computer hogging execution time. However, it is pretty effective in those situations.
*
* @see TimeoutState For how hard timeouts are handled.
* @see ComputerExecutor For how computers actually do execution.
*/
@SuppressWarnings("GuardedBy") // FIXME: Hard to know what the correct thing to do is.
public final class ComputerThread {
public final class ComputerThread implements ComputerScheduler {
private static final Logger LOG = LoggerFactory.getLogger(ComputerThread.class);
/**
@ -96,7 +92,7 @@ public final class ComputerThread {
/**
* Time difference between reporting crashed threads.
*
* @see Worker#reportTimeout(ComputerExecutor, long)
* @see WorkerThread#reportTimeout(ExecutorImpl, long)
*/
private static final long REPORT_DEBOUNCE = TimeUnit.SECONDS.toNanos(1);
@ -123,7 +119,7 @@ public final class ComputerThread {
* The array of current workers, and their owning threads.
*/
@GuardedBy("threadLock")
private final Worker[] workers;
private final WorkerThread[] workers;
/**
* The number of workers in {@link #workers}.
@ -137,29 +133,33 @@ public final class ComputerThread {
private final long minPeriod;
private final ReentrantLock computerLock = new ReentrantLock();
private final Condition workerWakeup = computerLock.newCondition();
private final Condition monitorWakeup = computerLock.newCondition();
private final @GuardedBy("computerLock") Condition workerWakeup = computerLock.newCondition();
private final @GuardedBy("computerLock") Condition monitorWakeup = computerLock.newCondition();
private final AtomicInteger idleWorkers = new AtomicInteger(0);
/**
* Active queues to execute.
*/
private final TreeSet<ComputerExecutor> computerQueue = new TreeSet<>((a, b) -> {
@GuardedBy("computerLock")
private final TreeSet<ExecutorImpl> computerQueue = new TreeSet<>(ComputerThread::compareExecutors);
@SuppressWarnings("GuardedBy")
private static int compareExecutors(ExecutorImpl a, ExecutorImpl b) {
if (a == b) return 0; // Should never happen, but let's be consistent here
long at = a.virtualRuntime, bt = b.virtualRuntime;
if (at == bt) return Integer.compare(a.hashCode(), b.hashCode());
return at < bt ? -1 : 1;
});
}
/**
* The minimum {@link ComputerExecutor#virtualRuntime} time on the tree.
* The minimum {@link ExecutorImpl#virtualRuntime} time on the tree.
*/
private long minimumVirtualRuntime = 0;
public ComputerThread(int threadCount) {
workers = new Worker[threadCount];
workers = new WorkerThread[threadCount];
// latency and minPeriod are scaled by 1 + floor(log2(threads)). We can afford to execute tasks for
// longer when executing on more than one thread.
@ -168,13 +168,28 @@ public ComputerThread(int threadCount) {
minPeriod = DEFAULT_MIN_PERIOD * factor;
}
@Override
public Executor createExecutor(ComputerScheduler.Worker worker, MetricsObserver metrics) {
return new ExecutorImpl(worker, metrics);
}
@GuardedBy("threadLock")
private void addWorker(int index) {
LOG.trace("Spawning new worker {}.", index);
(workers[index] = new Worker(index)).owner.start();
(workers[index] = new WorkerThread(index)).owner.start();
workerCount++;
}
@SuppressWarnings("GuardedBy")
private int workerCount() {
return workerCount;
}
@SuppressWarnings("GuardedBy")
private WorkerThread[] workersReadOnly() {
return workers;
}
/**
* Ensure sufficient workers are running.
*/
@ -182,7 +197,7 @@ private void addWorker(int index) {
private void ensureRunning() {
// Don't even enter the lock if we've a monitor and don't need to/can't spawn an additional worker.
// We'll be holding the computer lock at this point, so there's no problems with idleWorkers being wrong.
if (monitor != null && (idleWorkers.get() > 0 || workerCount == workers.length)) return;
if (monitor != null && (idleWorkers.get() > 0 || workerCount() == workersReadOnly().length)) return;
threadLock.lock();
try {
@ -217,6 +232,7 @@ private void advanceState(int newState) {
* @return Whether the thread was successfully shut down.
* @throws InterruptedException If interrupted while waiting.
*/
@Override
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
advanceState(STOPPING);
@ -269,12 +285,12 @@ public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
/**
* Mark a computer as having work, enqueuing it on the thread.
* <p>
* You must be holding {@link ComputerExecutor}'s {@code queueLock} when calling this method - it should only
* You must be holding {@link ExecutorImpl}'s {@code queueLock} when calling this method - it should only
* be called from {@code enqueue}.
*
* @param executor The computer to execute work on.
*/
void queue(ComputerExecutor executor) {
void queue(ExecutorImpl executor) {
computerLock.lock();
try {
if (state.get() != RUNNING) throw new IllegalStateException("ComputerThread is no longer running");
@ -282,9 +298,6 @@ void queue(ComputerExecutor executor) {
// Ensure we've got a worker running.
ensureRunning();
if (executor.onComputerQueue) throw new IllegalStateException("Cannot queue already queued executor");
executor.onComputerQueue = true;
updateRuntimes(null);
// We're not currently on the queue, so update its current execution time to
@ -316,14 +329,15 @@ void queue(ComputerExecutor executor) {
/**
* Update the {@link ComputerExecutor#virtualRuntime}s of all running tasks, and then update the
* Update the {@link ExecutorImpl#virtualRuntime}s of all running tasks, and then update the
* {@link #minimumVirtualRuntime} based on the current tasks.
* <p>
* This is called before queueing tasks, to ensure that {@link #minimumVirtualRuntime} is up-to-date.
*
* @param current The machine which we updating runtimes from.
*/
private void updateRuntimes(@Nullable ComputerExecutor current) {
@GuardedBy("computerLock")
private void updateRuntimes(@Nullable ExecutorImpl current) {
var minRuntime = Long.MAX_VALUE;
// If we've a task on the queue, use that as our base time.
@ -332,7 +346,7 @@ private void updateRuntimes(@Nullable ComputerExecutor current) {
// Update all the currently executing tasks
var now = System.nanoTime();
var tasks = 1 + computerQueue.size();
for (@Nullable var runner : workers) {
for (@Nullable var runner : workersReadOnly()) {
if (runner == null) continue;
var executor = runner.currentExecutor.get();
if (executor == null) continue;
@ -357,22 +371,9 @@ private void updateRuntimes(@Nullable ComputerExecutor current) {
* Ensure the "currently working" state of the executor is reset, the timings are updated, and then requeue the
* executor if needed.
*
* @param runner The runner this task was on.
* @param executor The executor to requeue
*/
private void afterWork(Worker runner, ComputerExecutor executor) {
// Clear the executor's thread.
var currentThread = executor.executingThread.getAndSet(null);
if (currentThread != runner.owner) {
LOG.error(
"Expected computer #{} to be running on {}, but already running on {}. This is a SERIOUS bug, please report with your debug.log.",
executor.getComputer().getID(),
runner.owner.getName(),
currentThread == null ? "nothing" : currentThread.getName()
);
}
private void afterWork(ExecutorImpl executor) {
computerLock.lock();
try {
updateRuntimes(executor);
@ -388,6 +389,13 @@ private void afterWork(Worker runner, ComputerExecutor executor) {
}
}
@SuppressWarnings("GuardedBy")
private int computerQueueSize() {
// FIXME: We access this on other threads (in TimeoutState), so their reads won't be consistent. This isn't
// "critical" behaviour, so not clear if it matters too much.
return computerQueue.size();
}
/**
* The scaled period for a single task.
*
@ -397,11 +405,8 @@ private void afterWork(Worker runner, ComputerExecutor executor) {
* @see #LATENCY_MAX_TASKS
*/
long scaledPeriod() {
// FIXME: We access this on other threads (in TimeoutState), so their reads won't be consistent. This isn't
// "critical" behaviour, so not clear if it matters too much.
// +1 to include the current task
var count = 1 + computerQueue.size();
var count = 1 + computerQueueSize();
return count < LATENCY_MAX_TASKS ? latency / count : minPeriod;
}
@ -411,9 +416,8 @@ long scaledPeriod() {
* @return If we have work queued up.
*/
@VisibleForTesting
public boolean hasPendingWork() {
// FIXME: See comment in scaledPeriod. Again, we access this in multiple threads but not clear if it matters!
return !computerQueue.isEmpty();
boolean hasPendingWork() {
return computerQueueSize() > 0;
}
/**
@ -422,12 +426,11 @@ public boolean hasPendingWork() {
*
* @return If the computer threads are busy.
*/
@GuardedBy("computerLock")
private boolean isBusy() {
return computerQueue.size() > idleWorkers.get();
return computerQueueSize() > idleWorkers.get();
}
private void workerFinished(Worker worker) {
private void workerFinished(WorkerThread worker) {
// We should only shut down a worker once! This should only happen if we fail to abort a worker and then the
// worker finishes normally.
if (!worker.running.getAndSet(false)) return;
@ -442,6 +445,7 @@ private void workerFinished(Worker worker) {
workerCount--;
if (workers[worker.index] != worker) {
assert false : "workerFinished but inconsistent worker";
LOG.error("Worker {} closed, but new runner has been spawned.", worker.index);
} else if (state.get() == RUNNING || (state.get() == STOPPING && hasPendingWork())) {
addWorker(worker.index);
@ -455,7 +459,7 @@ private void workerFinished(Worker worker) {
}
/**
* Observes all currently active {@link Worker}s and terminates their tasks once they have exceeded the hard
* Observes all currently active {@link WorkerThread}s and terminates their tasks once they have exceeded the hard
* abort limit.
*
* @see TimeoutState
@ -491,7 +495,7 @@ private void runImpl() {
}
private void checkRunners() {
for (@Nullable var runner : workers) {
for (@Nullable var runner : workersReadOnly()) {
if (runner == null) continue;
// If the worker has no work, skip
@ -503,25 +507,28 @@ private void checkRunners() {
// If we're still within normal execution times (TIMEOUT) or soft abort (ABORT_TIMEOUT),
// then we can let the Lua machine do its work.
var afterStart = executor.timeout.nanoCumulative();
var afterHardAbort = afterStart - TimeoutState.TIMEOUT - TimeoutState.ABORT_TIMEOUT;
var remainingTime = executor.timeout.getRemainingTime();
// If remainingTime > 0, then we're executing normally,
// If remainingTime > -ABORT_TIMEOUT, then we've soft aborted.
// Otherwise, remainingTime <= -ABORT_TIMEOUT, and we've run over by -ABORT_TIMEOUT - remainingTime.
var afterHardAbort = -remainingTime - TimeoutState.ABORT_TIMEOUT;
if (afterHardAbort < 0) continue;
// Set the hard abort flag.
executor.timeout.hardAbort();
executor.abort();
executor.worker.abortWithTimeout();
if (afterHardAbort >= TimeoutState.ABORT_TIMEOUT * 2) {
// If we've hard aborted and interrupted, and we're still not dead, then mark the worker
// as dead, finish off the task, and spawn a new runner.
runner.reportTimeout(executor, afterStart);
runner.reportTimeout(executor, remainingTime);
runner.owner.interrupt();
workerFinished(runner);
} else if (afterHardAbort >= TimeoutState.ABORT_TIMEOUT) {
// If we've hard aborted but we're still not dead, dump the stack trace and interrupt
// the task.
runner.reportTimeout(executor, afterStart);
runner.reportTimeout(executor, remainingTime);
runner.owner.interrupt();
}
}
@ -531,11 +538,11 @@ private void checkRunners() {
/**
* Pulls tasks from the {@link #computerQueue} queue and runs them.
* <p>
* This is responsible for running the {@link ComputerExecutor#work()}, {@link ComputerExecutor#beforeWork()} and
* {@link ComputerExecutor#afterWork()} functions. Everything else is either handled by the executor, timeout
* state or monitor.
* This is responsible for running the {@link ComputerScheduler.Worker#work()}, {@link ExecutorImpl#beforeWork()}
* and {@link ExecutorImpl#afterWork()} functions. Everything else is either handled by the executor,
* timeout state or monitor.
*/
private final class Worker implements Runnable {
private final class WorkerThread implements Runnable {
/**
* The index into the {@link #workers} array.
*/
@ -550,21 +557,21 @@ private final class Worker implements Runnable {
* Whether this runner is currently executing. This may be set to false when this worker terminates, or when
* we try to abandon a worker in the monitor
*
* @see #workerFinished(Worker)
* @see #workerFinished(WorkerThread)
*/
final AtomicBoolean running = new AtomicBoolean(true);
/**
* The computer we're currently running.
*/
final AtomicReference<ComputerExecutor> currentExecutor = new AtomicReference<>(null);
final AtomicReference<ExecutorImpl> currentExecutor = new AtomicReference<>(null);
/**
* The last time we reported a stack trace, used to avoid spamming the logs.
*/
AtomicLong lastReport = new AtomicLong(Long.MIN_VALUE);
Worker(int index) {
WorkerThread(int index) {
this.index = index;
owner = workerFactory.newThread(this);
}
@ -579,10 +586,9 @@ public void run() {
}
private void runImpl() {
tasks:
while (running.get()) {
// Wait for an active queue to execute
ComputerExecutor executor;
ExecutorImpl executor;
computerLock.lock();
try {
idleWorkers.getAndIncrement();
@ -597,21 +603,18 @@ private void runImpl() {
computerLock.unlock();
}
// If we're trying to executing some task on this computer while someone else is doing work, something
// is seriously wrong.
while (!executor.executingThread.compareAndSet(null, owner)) {
var existing = executor.executingThread.get();
if (existing != null) {
LOG.error(
"Trying to run computer #{} on thread {}, but already running on {}. This is a SERIOUS bug, please report with your debug.log.",
executor.getComputer().getID(), owner.getName(), existing.getName()
);
continue tasks;
}
// Mark this computer as executing.
if (!ExecutorImpl.STATE.compareAndSet(executor, ExecutorState.ON_QUEUE, ExecutorState.RUNNING)) {
assert false : "Running computer on the wrong thread";
LOG.error(
"Trying to run computer #{} on thread {}, but already running on another thread. This is a SERIOUS " +
"bug, please report with your debug.log.",
executor.worker.getComputerID(), owner.getName()
);
}
// If we're stopping, the only thing this executor should be doing is shutting down.
if (state.get() >= STOPPING) executor.queueStop(false, true);
if (state.get() >= STOPPING) executor.worker.unload();
// Reset the timers
executor.beforeWork();
@ -622,19 +625,19 @@ private void runImpl() {
// Execute the task
try {
executor.work();
executor.worker.work();
} catch (Exception | LinkageError | VirtualMachineError e) {
LOG.error("Error running task on computer #" + executor.getComputer().getID(), e);
LOG.error("Error running task on computer #" + executor.worker.getComputerID(), e);
// Tear down the computer immediately. There's no guarantee it's well-behaved from now on.
executor.fastFail();
executor.worker.abortWithError();
} finally {
var thisExecutor = currentExecutor.getAndSet(null);
if (thisExecutor != null) afterWork(this, executor);
if (thisExecutor != null) afterWork(executor);
}
}
}
private void reportTimeout(ComputerExecutor executor, long time) {
private void reportTimeout(ExecutorImpl executor, long time) {
if (!LOG.isErrorEnabled(Logging.COMPUTER_ERROR)) return;
// Attempt to debounce stack trace reporting, limiting ourselves to one every second. There's no need to be
@ -647,8 +650,8 @@ private void reportTimeout(ComputerExecutor executor, long time) {
var owner = Objects.requireNonNull(this.owner);
var builder = new StringBuilder()
.append("Terminating computer #").append(executor.getComputer().getID())
.append(" due to timeout (running for ").append(time * 1e-9)
.append("Terminating computer #").append(executor.worker.getComputerID())
.append(" due to timeout (ran over by ").append(time * -1e-9)
.append(" seconds). This is NOT a bug, but may mean a computer is misbehaving.\n")
.append("Thread ")
.append(owner.getName())
@ -662,9 +665,150 @@ private void reportTimeout(ComputerExecutor executor, long time) {
builder.append(" at ").append(element).append('\n');
}
executor.printState(builder);
executor.worker.writeState(builder);
LOG.warn(builder.toString());
}
}
/**
* The current state of a {@link ExecutorState}.
* <p>
* Executors are either enqueued (have more work to do) or not and working or not. This enum encapsulates the four
* combinations of these properties, with the following transitions:
*
* <pre>{@code
* submit() afterWork()
* IDLE ---------> ON_QUEUE <----------- REPEAT
* ^ | ^
* | | runImpl() |
* | V |
* +---------------RUNNING----------------+
* afterWork() submit()
* }</pre>
*/
enum ExecutorState {
/**
* This executor is idle.
*/
IDLE,
/**
* This executor is on the queue but idle.
*/
ON_QUEUE,
/**
* This executor is running and will transition to idle after execution.
*/
RUNNING,
/**
* This executor is running and should run again after this task finishes.
*/
REPEAT;
ExecutorState enqueue() {
return switch (this) {
case IDLE, ON_QUEUE -> ON_QUEUE;
case RUNNING, REPEAT -> REPEAT;
};
}
ExecutorState requeue() {
return switch (this) {
case IDLE, ON_QUEUE -> {
assert false : "Impossible state after executing";
LOG.error("Impossible state - calling requeue with {}.", this);
yield ExecutorState.ON_QUEUE;
}
case RUNNING -> ExecutorState.IDLE;
case REPEAT -> ExecutorState.ON_QUEUE;
};
}
}
private final class ExecutorImpl implements Executor {
public static final AtomicReferenceFieldUpdater<ExecutorImpl, ExecutorState> STATE = AtomicReferenceFieldUpdater.newUpdater(
ExecutorImpl.class, ExecutorState.class, "$state"
);
final Worker worker;
private final MetricsObserver metrics;
final TimeoutImpl timeout;
/**
* The current state of this worker.
*/
private volatile ExecutorState $state = ExecutorState.IDLE;
/**
* The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers.
*
* @see ComputerThread
*/
long virtualRuntime = 0;
/**
* The last time at which we updated {@link #virtualRuntime}.
*
* @see ComputerThread
*/
long vRuntimeStart;
ExecutorImpl(Worker worker, MetricsObserver metrics) {
this.worker = worker;
this.metrics = metrics;
timeout = new TimeoutImpl();
}
/**
* Called before calling {@link Worker#work()}, setting up any important state.
*/
void beforeWork() {
vRuntimeStart = System.nanoTime();
timeout.startTimer(scaledPeriod());
}
/**
* Called after executing {@link Worker#work()}.
*
* @return If we have more work to do.
*/
boolean afterWork() {
timeout.reset();
metrics.observe(Metrics.COMPUTER_TASKS, timeout.getExecutionTime());
var state = STATE.getAndUpdate(this, ExecutorState::requeue);
return state == ExecutorState.REPEAT;
}
@Override
public void submit() {
var state = STATE.getAndUpdate(this, ExecutorState::enqueue);
if (state == ExecutorState.IDLE) queue(this);
}
@Override
public TimeoutState timeoutState() {
return timeout;
}
@Override
public long getRemainingTime() {
return timeout.getRemainingTime();
}
@Override
public void setRemainingTime(long time) {
timeout.setRemainingTime(time);
}
}
private final class TimeoutImpl extends ManagedTimeoutState {
@Override
protected boolean shouldPause() {
return hasPendingWork();
}
}
}

View File

@ -0,0 +1,127 @@
// SPDX-FileCopyrightText: 2019 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.computer.TimeoutState;
/**
* A basic {@link TimeoutState} implementation, for use by {@link ComputerScheduler} implementations.
* <p>
* While almost all {@link TimeoutState} implementations will be derived from this class, the two are intentionally kept
* separate. This class is intended for the {@link ComputerScheduler} (which is responsible for controlling the
* timeout), and not for the main computer logic, which only needs to check timeout flags.
* <p>
* This class tracks the time a computer was started (and thus {@linkplain #getExecutionTime()} how long it has been
* running for), as well as the deadline for when a computer should be soft aborted and paused.
*/
public abstract class ManagedTimeoutState extends TimeoutState {
/**
* When execution of this computer started.
*
* @see #getExecutionTime()
*/
private long startTime;
/**
* The time when this computer should be aborted.
*
* @see #getRemainingTime()
* @see #setRemainingTime(long)
*/
private long abortDeadline;
/**
* The time when this computer should be paused if {@link ComputerThread#hasPendingWork()} is set.
*/
private long pauseDeadline;
@Override
public final synchronized void refresh() {
// Important: The weird arithmetic here is important, as nanoTime may return negative values, and so we
// need to handle overflow.
var now = System.nanoTime();
var changed = false;
if (!paused && Long.compareUnsigned(now, pauseDeadline) >= 0 && shouldPause()) { // now >= currentDeadline
paused = true;
changed = true;
}
if (!softAbort && Long.compareUnsigned(now, abortDeadline) >= 0) { // now >= currentAbort
softAbort = true;
changed = true;
}
if (softAbort && !hardAbort && Long.compareUnsigned(now, abortDeadline + ABORT_TIMEOUT) >= 0) { // now >= currentAbort + ABORT_TIMEOUT.
hardAbort = true;
changed = true;
}
if (changed) updateListeners();
}
/**
* Get how long this computer has been executing for.
*
* @return How long the computer has been running for in nanoseconds.
*/
public final long getExecutionTime() {
return System.nanoTime() - startTime;
}
/**
* Get how long this computer is permitted to run before being aborted.
*
* @return The remaining time, in nanoseconds.
* @see ComputerScheduler.Executor#getRemainingTime()
*/
public final long getRemainingTime() {
return abortDeadline - System.nanoTime();
}
/**
* Set how long this computer is permitted to run before being aborted.
*
* @param time The remaining time, in nanoseconds.
* @see ComputerScheduler.Executor#setRemainingTime(long)
*/
public final void setRemainingTime(long time) {
abortDeadline = startTime + time;
}
/**
* Set the hard-abort flag immediately.
*/
public final void hardAbort() {
softAbort = hardAbort = true;
synchronized (this) {
updateListeners();
}
}
/**
* Start this timer, recording the current start time, and deadline before a computer may be paused.
*
* @param pauseTimeout The minimum time this computer can run before potentially being paused.
*/
public final synchronized void startTimer(long pauseTimeout) {
var now = System.nanoTime();
startTime = now;
abortDeadline = now + BASE_TIMEOUT;
pauseDeadline = now + pauseTimeout;
}
/**
* Clear the paused and abort flags.
*/
public final synchronized void reset() {
paused = softAbort = hardAbort = false;
updateListeners();
}
/**
* Determine if this computer should be paused, as other computers are contending for work.
*
* @return If this computer should be paused.
*/
protected abstract boolean shouldPause();
}

View File

@ -0,0 +1,142 @@
// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.MetricsObserver;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.concurrent.GuardedBy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
public class ComputerThreadRunner implements AutoCloseable {
private final ComputerThread thread;
private final Lock errorLock = new ReentrantLock();
private final @GuardedBy("errorLock") Condition hasError = errorLock.newCondition();
@GuardedBy("errorLock")
private @MonotonicNonNull Throwable error = null;
public ComputerThreadRunner() {
this.thread = new ComputerThread(1);
}
public ComputerThread thread() {
return thread;
}
@Override
public void close() {
try {
if (!thread.stop(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("Failed to shutdown ComputerContext in time.");
}
} catch (InterruptedException e) {
throw new IllegalStateException("Runtime thread was interrupted", e);
}
}
public Worker createWorker(BiConsumer<ComputerScheduler.Executor, TimeoutState> action) {
return new Worker(thread, e -> action.accept(e, e.timeoutState()));
}
public void createLoopingComputer() {
new Worker(thread, e -> {
Thread.sleep(100);
e.submit();
}).executor().submit();
}
public void startAndWait(Worker worker) throws Exception {
worker.executor().submit();
do {
errorLock.lock();
try {
rethrowIfNeeded();
if (hasError.await(100, TimeUnit.MILLISECONDS)) rethrowIfNeeded();
} finally {
errorLock.unlock();
}
} while (worker.executed == 0 || thread.hasPendingWork());
}
@GuardedBy("errorLock")
private void rethrowIfNeeded() throws Exception {
if (error != null) ComputerThreadRunner.<Exception>throwUnchecked0(error);
}
@SuppressWarnings("unchecked")
private static <T extends Throwable> void throwUnchecked0(Throwable t) throws T {
throw (T) t;
}
@FunctionalInterface
private interface Task {
void run(ComputerScheduler.Executor executor) throws InterruptedException;
}
public final class Worker implements ComputerScheduler.Worker {
private final Task run;
private final ComputerScheduler.Executor executor;
volatile int executed = 0;
private Worker(ComputerScheduler scheduler, Task run) {
this.run = run;
this.executor = scheduler.createExecutor(this, MetricsObserver.discard());
}
public ComputerScheduler.Executor executor() {
return executor;
}
@Override
public void work() {
try {
run.run(executor);
executed++;
} catch (Throwable e) {
errorLock.lock();
try {
if (error == null) {
error = e;
hasError.signal();
} else {
error.addSuppressed(e);
}
} finally {
errorLock.unlock();
}
if (e instanceof Exception || e instanceof AssertionError) return;
throwUnchecked0(e);
}
}
@Override
public int getComputerID() {
return 0;
}
@Override
public void writeState(StringBuilder output) {
}
@Override
public void abortWithTimeout() {
}
@Override
public void unload() {
}
@Override
public void abortWithError() {
}
}
}

View File

@ -2,11 +2,10 @@
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer;
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.lua.MachineResult;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.test.core.ConcurrentHelpers;
import dan200.computercraft.test.core.computer.KotlinComputerManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -28,11 +27,11 @@
@Execution(ExecutionMode.CONCURRENT)
public class ComputerThreadTest {
private static final Logger LOG = LoggerFactory.getLogger(ComputerThreadTest.class);
private KotlinComputerManager manager;
private ComputerThreadRunner manager;
@BeforeEach
public void before() {
manager = new KotlinComputerManager();
manager = new ComputerThreadRunner();
}
@AfterEach
@ -42,16 +41,13 @@ public void after() {
@Test
public void testSoftAbort() throws Exception {
var computer = manager.create();
manager.enqueue(computer, timeout -> {
var computer = manager.createWorker((executor, timeout) -> {
executor.setRemainingTime(TimeoutState.TIMEOUT);
assertFalse(timeout.isSoftAborted(), "Should not start soft-aborted");
var delay = ConcurrentHelpers.waitUntil(timeout::isSoftAborted);
assertThat("Should be soft aborted", delay * 1e-9, closeTo(7, 1.0));
LOG.info("Slept for {}", delay);
computer.shutdown();
return MachineResult.OK;
});
manager.startAndWait(computer);
@ -59,15 +55,12 @@ public void testSoftAbort() throws Exception {
@Test
public void testHardAbort() throws Exception {
var computer = manager.create();
manager.enqueue(computer, timeout -> {
var computer = manager.createWorker((executor, timeout) -> {
executor.setRemainingTime(TimeoutState.TIMEOUT);
assertFalse(timeout.isHardAborted(), "Should not start soft-aborted");
assertThrows(InterruptedException.class, () -> Thread.sleep(11_000), "Sleep should be hard aborted");
assertTrue(timeout.isHardAborted(), "Thread should be hard aborted");
computer.shutdown();
return MachineResult.OK;
});
manager.startAndWait(computer);
@ -75,13 +68,9 @@ public void testHardAbort() throws Exception {
@Test
public void testNoPauseIfNoOtherMachines() throws Exception {
var computer = manager.create();
manager.enqueue(computer, timeout -> {
var computer = manager.createWorker((executor, timeout) -> {
var didPause = ConcurrentHelpers.waitUntil(timeout::isPaused, 5, TimeUnit.SECONDS);
assertFalse(didPause, "Machine shouldn't have paused within 5s");
computer.shutdown();
return MachineResult.OK;
});
manager.startAndWait(computer);
@ -89,18 +78,14 @@ public void testNoPauseIfNoOtherMachines() throws Exception {
@Test
public void testPauseIfSomeOtherMachine() throws Exception {
var computer = manager.create();
manager.enqueue(computer, timeout -> {
var budget = manager.context().computerScheduler().scaledPeriod();
var computer = manager.createWorker((executor, timeout) -> {
var budget = manager.thread().scaledPeriod();
assertEquals(budget, TimeUnit.MILLISECONDS.toNanos(25), "Budget should be 25ms");
var delay = ConcurrentHelpers.waitUntil(timeout::isPaused);
// Linux appears to have much more accurate timing than Windows/OSX. Or at least on CI!
var time = System.getProperty("os.name", "").toLowerCase(Locale.ROOT).contains("linux") ? 0.05 : 0.3;
assertThat("Paused within a short time", delay * 1e-9, lessThanOrEqualTo(time));
computer.shutdown();
return MachineResult.OK;
});
manager.createLoopingComputer();

View File

@ -1,189 +0,0 @@
// SPDX-FileCopyrightText: 2022 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.test.core.computer
import dan200.computercraft.api.lua.ILuaAPI
import dan200.computercraft.core.ComputerContext
import dan200.computercraft.core.computer.Computer
import dan200.computercraft.core.computer.TimeoutState
import dan200.computercraft.core.lua.MachineEnvironment
import dan200.computercraft.core.lua.MachineResult
import dan200.computercraft.core.terminal.Terminal
import java.util.*
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
typealias FakeComputerTask = (state: TimeoutState) -> MachineResult
/**
* Creates "fake" computers, which just run user-defined tasks rather than Lua code.
*/
class KotlinComputerManager : AutoCloseable {
private val machines: MutableMap<Computer, Queue<FakeComputerTask>> = HashMap()
private val context = ComputerContext.builder(BasicEnvironment())
.luaFactory { env, _ -> DummyLuaMachine(env) }
.build()
private val errorLock: Lock = ReentrantLock()
private val hasError = errorLock.newCondition()
@Volatile
private var error: Throwable? = null
override fun close() {
try {
context.ensureClosed(10, TimeUnit.SECONDS)
} catch (e: InterruptedException) {
throw IllegalStateException("Runtime thread was interrupted", e)
}
}
fun context(): ComputerContext {
return context
}
/**
* Create a new computer which pulls from our task queue.
*
* @return The computer. This will not be started yet, you must call [Computer.turnOn] and
* [Computer.tick] to do so.
*/
fun create(): Computer {
val queue: Queue<FakeComputerTask> = ConcurrentLinkedQueue()
val computer = Computer(
context,
BasicEnvironment(),
Terminal(51, 19, true),
0,
)
computer.addApi(QueuePassingAPI(queue)) // Inject an extra API to pass the queue to the machine.
machines[computer] = queue
return computer
}
/**
* Create and start a new computer which loops forever.
*/
fun createLoopingComputer() {
val computer = create()
enqueueForever(computer) {
Thread.sleep(100)
MachineResult.OK
}
computer.turnOn()
computer.tick()
}
/**
* Enqueue a task on a computer.
*
* @param computer The computer to enqueue the work on.
* @param task The task to run.
*/
fun enqueue(computer: Computer, task: FakeComputerTask) {
machines[computer]!!.offer(task)
}
/**
* Enqueue a repeated task on a computer. This is automatically requeued when the task finishes, meaning the task
* queue is never empty.
*
* @param computer The computer to enqueue the work on.
* @param task The task to run.
*/
private fun enqueueForever(computer: Computer, task: FakeComputerTask) {
machines[computer]!!.offer {
val result = task(it)
enqueueForever(computer, task)
computer.queueEvent("some_event", null)
result
}
}
/**
* Sleep for a given period, immediately propagating any exceptions thrown by a computer.
*
* @param delay The duration to sleep for.
* @param unit The time unit the duration is measured in.
* @throws Exception An exception thrown by a running computer.
*/
@Throws(Exception::class)
fun sleep(delay: Long, unit: TimeUnit?) {
errorLock.lock()
try {
rethrowIfNeeded()
if (hasError.await(delay, unit)) rethrowIfNeeded()
} finally {
errorLock.unlock()
}
}
/**
* Start a computer and wait for it to finish.
*
* @param computer The computer to wait for.
* @throws Exception An exception thrown by a running computer.
*/
@Throws(Exception::class)
fun startAndWait(computer: Computer) {
computer.turnOn()
computer.tick()
do {
sleep(100, TimeUnit.MILLISECONDS)
} while (context.computerScheduler().hasPendingWork() || computer.isOn)
rethrowIfNeeded()
}
@Throws(Exception::class)
private fun rethrowIfNeeded() {
val error = error ?: return
throw error
}
private class QueuePassingAPI constructor(val tasks: Queue<FakeComputerTask>) : ILuaAPI {
override fun getNames(): Array<String> = arrayOf()
}
private inner class DummyLuaMachine(private val environment: MachineEnvironment) : KotlinLuaMachine(environment) {
private val tasks: Queue<FakeComputerTask> =
environment.apis.asSequence().filterIsInstance(QueuePassingAPI::class.java).first().tasks
override fun getTask(): (suspend KotlinLuaMachine.() -> Unit)? {
try {
val task = tasks.remove()
return {
try {
task(environment.timeout)
} catch (e: Throwable) {
reportError(e)
}
}
} catch (e: Throwable) {
reportError(e)
return null
}
}
override fun close() {}
private fun reportError(e: Throwable) {
errorLock.lock()
try {
if (error == null) {
error = e
hasError.signal()
} else {
error!!.addSuppressed(e)
}
} finally {
errorLock.unlock()
}
if (e is Exception || e is AssertionError) return else throw e
}
}
}

View File

@ -1,61 +0,0 @@
// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer;
import cc.tweaked.web.js.Callbacks;
import org.teavm.jso.browser.TimerHandler;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
/**
* A reimplementation of {@link ComputerThread} which, well, avoids any threading!
* <p>
* This instead just exucutes work as soon as possible via {@link Callbacks#setImmediate(TimerHandler)}. Timeouts are
* instead handled via polling, see {@link cc.tweaked.web.builder.PatchCobalt}.
*/
public class TComputerThread {
private static final ArrayDeque<ComputerExecutor> executors = new ArrayDeque<>();
private final TimerHandler callback = this::workOnce;
public TComputerThread(int threads) {
}
public void queue(ComputerExecutor executor) {
if (executor.onComputerQueue) throw new IllegalStateException("Cannot queue already queued executor");
executor.onComputerQueue = true;
if (executors.isEmpty()) Callbacks.setImmediate(callback);
executors.add(executor);
}
private void workOnce() {
var executor = executors.poll();
if (executor == null) throw new IllegalStateException("Working, but executor is null");
if (!executor.onComputerQueue) throw new IllegalArgumentException("Working but not on queue");
executor.beforeWork();
try {
executor.work();
} catch (Exception e) {
e.printStackTrace();
}
if (executor.afterWork()) executors.push(executor);
if (!executors.isEmpty()) Callbacks.setImmediate(callback);
}
public boolean hasPendingWork() {
return true;
}
public long scaledPeriod() {
return 50 * 1_000_000L;
}
public boolean stop(long timeout, TimeUnit unit) {
return true;
}
}

View File

@ -0,0 +1,114 @@
// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.computer.computerthread;
import cc.tweaked.web.js.Callbacks;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.MetricsObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.teavm.jso.browser.TimerHandler;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
/**
* An implementation of {@link ComputerScheduler} which executes work as soon as possible via
* {@link Callbacks#setImmediate(TimerHandler)}.
* <p>
* Timeouts are instead handled via polling, see {@link cc.tweaked.web.builder.PatchCobalt}.
*
* @see ComputerThread
*/
public class TComputerThread implements ComputerScheduler {
private static final Logger LOG = LoggerFactory.getLogger(TComputerThread.class);
private static final long SCALED_PERIOD = 50 * 1_000_000L;
private static final ArrayDeque<ExecutorImpl> executors = new ArrayDeque<>();
private static final TimerHandler callback = TComputerThread::workOnce;
public TComputerThread(int threads) {
}
@Override
public Executor createExecutor(Worker worker, MetricsObserver metrics) {
return new ExecutorImpl(worker);
}
private static void workOnce() {
var executor = executors.poll();
if (executor == null) throw new IllegalStateException("Working, but executor is null");
executor.beforeWork();
try {
executor.worker.work();
} catch (Exception e) {
LOG.error("Error running computer", e);
executor.worker.abortWithError();
}
executor.afterWork();
if (!executors.isEmpty()) Callbacks.setImmediate(callback);
}
@Override
public boolean stop(long timeout, TimeUnit unit) {
return true;
}
/**
* The {@link Executor} for our scheduler.
*/
private static final class ExecutorImpl implements ComputerScheduler.Executor {
final ComputerScheduler.Worker worker;
private final TimeoutImpl timeout = new TimeoutImpl();
private boolean onQueue;
private ExecutorImpl(Worker worker) {
this.worker = worker;
}
@Override
public void submit() {
if (onQueue) return;
onQueue = true;
if (executors.isEmpty()) Callbacks.setImmediate(callback);
executors.add(this);
}
void beforeWork() {
if (!onQueue) throw new IllegalArgumentException("Working but not on queue");
onQueue = false;
timeout.startTimer(SCALED_PERIOD);
}
void afterWork() {
timeout.reset();
}
@Override
public TimeoutState timeoutState() {
return timeout;
}
@Override
public long getRemainingTime() {
return timeout.getRemainingTime();
}
@Override
public void setRemainingTime(long time) {
timeout.setRemainingTime(time);
}
}
private static final class TimeoutImpl extends ManagedTimeoutState {
@Override
protected boolean shouldPause() {
return true;
}
}
}