diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml index 975cfcf4d..0adcb6220 100644 --- a/config/checkstyle/checkstyle.xml +++ b/config/checkstyle/checkstyle.xml @@ -112,7 +112,9 @@ SPDX-License-Identifier: MPL-2.0 - + + + @@ -122,7 +124,7 @@ SPDX-License-Identifier: MPL-2.0 - + diff --git a/projects/core/src/main/java/dan200/computercraft/core/ComputerContext.java b/projects/core/src/main/java/dan200/computercraft/core/ComputerContext.java index 3a9684b7d..d251e2a7e 100644 --- a/projects/core/src/main/java/dan200/computercraft/core/ComputerContext.java +++ b/projects/core/src/main/java/dan200/computercraft/core/ComputerContext.java @@ -9,6 +9,7 @@ import dan200.computercraft.core.asm.GenericMethod; import dan200.computercraft.core.asm.LuaMethodSupplier; import dan200.computercraft.core.asm.PeripheralMethodSupplier; import dan200.computercraft.core.computer.GlobalEnvironment; +import dan200.computercraft.core.computer.computerthread.ComputerScheduler; import dan200.computercraft.core.computer.computerthread.ComputerThread; import dan200.computercraft.core.computer.mainthread.MainThreadScheduler; import dan200.computercraft.core.computer.mainthread.NoWorkMainThreadScheduler; @@ -31,7 +32,7 @@ import java.util.concurrent.TimeUnit; */ public final class ComputerContext { private final GlobalEnvironment globalEnvironment; - private final ComputerThread computerScheduler; + private final ComputerScheduler computerScheduler; private final MainThreadScheduler mainThreadScheduler; private final ILuaMachine.Factory luaFactory; private final List apiFactories; @@ -39,7 +40,7 @@ public final class ComputerContext { private final MethodSupplier peripheralMethods; ComputerContext( - GlobalEnvironment globalEnvironment, ComputerThread computerScheduler, + GlobalEnvironment globalEnvironment, ComputerScheduler computerScheduler, MainThreadScheduler mainThreadScheduler, ILuaMachine.Factory luaFactory, List apiFactories, MethodSupplier luaMethods, MethodSupplier peripheralMethods @@ -68,7 +69,7 @@ public final class ComputerContext { * * @return The current computer thread manager. */ - public ComputerThread computerScheduler() { + public ComputerScheduler computerScheduler() { return computerScheduler; } @@ -162,7 +163,7 @@ public final class ComputerContext { */ public static class Builder { private final GlobalEnvironment environment; - private int threads = 1; + private @Nullable ComputerScheduler computerScheduler = null; private @Nullable MainThreadScheduler mainThreadScheduler; private @Nullable ILuaMachine.Factory luaFactory; private @Nullable List apiFactories; @@ -173,7 +174,7 @@ public final class ComputerContext { } /** - * Set the number of threads the {@link ComputerThread} will use. + * Set the {@link #computerScheduler()} to use {@link ComputerThread} with a given number of threads. * * @param threads The number of threads to use. * @return {@code this}, for chaining @@ -181,7 +182,20 @@ public final class ComputerContext { */ public Builder computerThreads(int threads) { if (threads < 1) throw new IllegalArgumentException("Threads must be >= 1"); - this.threads = threads; + return computerScheduler(new ComputerThread(threads)); + } + + /** + * Set the {@link ComputerScheduler} for this context. + * + * @param scheduler The computer thread scheduler. + * @return {@code this}, for chaining + * @see ComputerContext#mainThreadScheduler() + */ + public Builder computerScheduler(ComputerScheduler scheduler) { + Objects.requireNonNull(scheduler); + if (computerScheduler != null) throw new IllegalStateException("Computer scheduler already specified"); + computerScheduler = scheduler; return this; } @@ -250,7 +264,7 @@ public final class ComputerContext { public ComputerContext build() { return new ComputerContext( environment, - new ComputerThread(threads), + computerScheduler == null ? new ComputerThread(1) : computerScheduler, mainThreadScheduler == null ? new NoWorkMainThreadScheduler() : mainThreadScheduler, luaFactory == null ? CobaltLuaMachine::new : luaFactory, apiFactories == null ? List.of() : apiFactories, diff --git a/projects/core/src/main/java/dan200/computercraft/core/computer/ComputerExecutor.java b/projects/core/src/main/java/dan200/computercraft/core/computer/ComputerExecutor.java index be07401a6..73ac27f1c 100644 --- a/projects/core/src/main/java/dan200/computercraft/core/computer/ComputerExecutor.java +++ b/projects/core/src/main/java/dan200/computercraft/core/computer/ComputerExecutor.java @@ -10,6 +10,7 @@ import dan200.computercraft.api.lua.ILuaAPI; import dan200.computercraft.core.ComputerContext; import dan200.computercraft.core.CoreConfig; import dan200.computercraft.core.apis.*; +import dan200.computercraft.core.computer.computerthread.ComputerScheduler; import dan200.computercraft.core.computer.computerthread.ComputerThread; import dan200.computercraft.core.filesystem.FileSystem; import dan200.computercraft.core.filesystem.FileSystemException; @@ -18,7 +19,6 @@ import dan200.computercraft.core.lua.MachineEnvironment; import dan200.computercraft.core.lua.MachineException; import dan200.computercraft.core.methods.LuaMethod; import dan200.computercraft.core.methods.MethodSupplier; -import dan200.computercraft.core.metrics.Metrics; import dan200.computercraft.core.metrics.MetricsObserver; import dan200.computercraft.core.util.Colour; import dan200.computercraft.core.util.Nullability; @@ -26,13 +26,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.io.InputStream; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.List; import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; /** @@ -55,7 +55,7 @@ import java.util.concurrent.locks.ReentrantLock; * One final responsibility for the executor is calling {@link ILuaAPI#update()} every tick, via the {@link #tick()} * method. This should only be called when the computer is actually on ({@link #isOn}). */ -final class ComputerExecutor { +final class ComputerExecutor implements ComputerScheduler.Worker { private static final Logger LOG = LoggerFactory.getLogger(ComputerExecutor.class); private static final int QUEUE_LIMIT = 256; @@ -63,9 +63,7 @@ final class ComputerExecutor { private final ComputerEnvironment computerEnvironment; private final MetricsObserver metrics; private final List apis = new ArrayList<>(); - private final ComputerThread scheduler; private final MethodSupplier luaMethods; - final TimeoutState timeout; private @Nullable FileSystem fileSystem; @@ -93,34 +91,11 @@ final class ComputerExecutor { private final ReentrantLock isOnLock = new ReentrantLock(); /** - * A lock used for any changes to {@link #eventQueue}, {@link #command} or {@link #onComputerQueue}. This will be - * used on the main thread, so locks should be kept as brief as possible. + * A lock used for any changes to {@link #eventQueue} or {@link #command}. This will be used on the main thread, + * so locks should be kept as brief as possible. */ private final Object queueLock = new Object(); - /** - * Determines if this executor is present within {@link ComputerThread}. - * - * @see #queueLock - * @see #enqueue() - * @see #afterWork() - */ - volatile boolean onComputerQueue = false; - - /** - * The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers. - * - * @see ComputerThread - */ - long virtualRuntime = 0; - - /** - * The last time at which we updated {@link #virtualRuntime}. - * - * @see ComputerThread - */ - long vRuntimeStart; - /** * The command that {@link #work()} should execute on the computer thread. *

@@ -130,6 +105,7 @@ final class ComputerExecutor { * Note, if command is not {@code null}, then some command is scheduled to be executed. Otherwise it is not * currently in the queue (or is currently being executed). */ + @GuardedBy("queueLock") private volatile @Nullable StateCommand command; /** @@ -137,43 +113,45 @@ final class ComputerExecutor { *

* Note, this should be empty if this computer is off - it is cleared on shutdown and when turning on again. */ + @GuardedBy("queueLock") private final Queue eventQueue = new ArrayDeque<>(4); /** - * Whether we interrupted an event and so should resume it instead of executing another task. + * Whether this computer was paused (and so should resume without pulling an event) or not. * + * @see #timeRemaining * @see #work() * @see #resumeMachine(String, Object[]) */ - private boolean interruptedEvent = false; + private boolean wasPaused; + + /** + * The amount of time this computer can run for before being interrupted. This is only defined when + * {@link #wasPaused} is set. + */ + private long timeRemaining = 0; /** * Whether this executor has been closed, and will no longer accept any incoming commands or events. * * @see #queueStop(boolean, boolean) */ + @GuardedBy("queueLock") private boolean closed; private @Nullable WritableMount rootMount; - /** - * The thread the executor is running on. This is non-null when performing work. We use this to ensure we're only - * doing one bit of work at one time. - * - * @see ComputerThread - */ - final AtomicReference executingThread = new AtomicReference<>(); - private final ILuaMachine.Factory luaFactory; + private final ComputerScheduler.Executor executor; + ComputerExecutor(Computer computer, ComputerEnvironment computerEnvironment, ComputerContext context) { this.computer = computer; this.computerEnvironment = computerEnvironment; metrics = computerEnvironment.getMetrics(); luaFactory = context.luaFactory(); - scheduler = context.computerScheduler(); luaMethods = context.luaMethods(); - timeout = new TimeoutState(scheduler); + executor = context.computerScheduler().createExecutor(this, metrics); var environment = computer.getEnvironment(); @@ -193,6 +171,11 @@ final class ComputerExecutor { } } + @Override + public int getComputerID() { + return computer.getID(); + } + boolean isOn() { return isOn; } @@ -203,10 +186,6 @@ final class ComputerExecutor { return fileSystem; } - Computer getComputer() { - return computer; - } - void addApi(ILuaAPI api) { apis.add(new ApiWrapper(api, null)); } @@ -220,8 +199,8 @@ final class ComputerExecutor { if (closed || isOn || command != null) return; command = StateCommand.TURN_ON; - enqueue(); } + enqueue(); } /** @@ -246,24 +225,31 @@ final class ComputerExecutor { } command = newCommand; - enqueue(); } + enqueue(); } /** * Abort this whole computer due to a timeout. This will immediately destroy the Lua machine, * and then schedule a shutdown. */ - void abort() { - immediateFail(StateCommand.ABORT); + @Override + public void abortWithTimeout() { + immediateFail(StateCommand.ABORT_WITH_TIMEOUT); } /** * Abort this whole computer due to an internal error. This will immediately destroy the Lua machine, * and then schedule a shutdown. */ - void fastFail() { - immediateFail(StateCommand.ERROR); + @Override + public void abortWithError() { + immediateFail(StateCommand.ABORT_WITH_ERROR); + } + + @Override + public void unload() { + queueStop(false, true); } private void immediateFail(StateCommand command) { @@ -294,17 +280,15 @@ final class ComputerExecutor { if (closed || command != null || eventQueue.size() >= QUEUE_LIMIT) return; eventQueue.offer(new Event(event, args)); - enqueue(); } + enqueue(); } /** * Add this executor to the {@link ComputerThread} if not already there. */ private void enqueue() { - synchronized (queueLock) { - if (!onComputerQueue) scheduler.queue(this); - } + executor.submit(); } /** @@ -385,7 +369,7 @@ final class ComputerExecutor { // Create the lua machine try (var bios = biosStream) { return luaFactory.create(new MachineEnvironment( - new LuaContext(computer), metrics, timeout, + new LuaContext(computer), metrics, executor.timeoutState(), () -> apis.stream().map(ApiWrapper::api).iterator(), luaMethods, computer.getGlobalEnvironment().getHostString() @@ -405,7 +389,6 @@ final class ComputerExecutor { try { // Reset the terminal and event queue computer.getTerminal().reset(); - interruptedEvent = false; synchronized (queueLock) { eventQueue.clear(); } @@ -433,15 +416,15 @@ final class ComputerExecutor { isOnLock.unlock(); } - // Now actually start the computer, now that everything is set up. - resumeMachine(null, null); + // Mark the Lua VM as ready to be executed next time. + wasPaused = true; + timeRemaining = TimeoutState.TIMEOUT; } private void shutdown() throws InterruptedException { isOnLock.lockInterruptibly(); try { - isOn = false; - interruptedEvent = false; + isOn = wasPaused = false; synchronized (queueLock) { eventQueue.clear(); } @@ -469,36 +452,6 @@ final class ComputerExecutor { } } - /** - * Called before calling {@link #work()}, setting up any important state. - */ - void beforeWork() { - vRuntimeStart = System.nanoTime(); - timeout.startTimer(); - } - - /** - * Called after executing {@link #work()}. - * - * @return If we have more work to do. - */ - boolean afterWork() { - if (interruptedEvent) { - timeout.pauseTimer(); - } else { - timeout.stopTimer(); - } - - metrics.observe(Metrics.COMPUTER_TASKS, timeout.nanoCurrent()); - - if (interruptedEvent) return true; - - synchronized (queueLock) { - if (eventQueue.isEmpty() && command == null) return onComputerQueue = false; - return true; - } - } - /** * The main worker function, called by {@link ComputerThread}. *

@@ -508,15 +461,15 @@ final class ComputerExecutor { * @see #command * @see #eventQueue */ - void work() throws InterruptedException { - if (interruptedEvent && !closed) { - interruptedEvent = false; - if (machine != null) { - resumeMachine(null, null); - return; - } + @Override + public void work() throws InterruptedException { + workImpl(); + synchronized (queueLock) { + if (wasPaused || command != null || !eventQueue.isEmpty()) enqueue(); } + } + private void workImpl() throws InterruptedException { StateCommand command; Event event = null; synchronized (queueLock) { @@ -524,7 +477,7 @@ final class ComputerExecutor { this.command = null; // If we've no command, pull something from the event queue instead. - if (command == null) { + if (command == null && !wasPaused) { if (!isOn) { // We're not on and had no command, but we had work queued. This should never happen, so clear // the event queue just in case. @@ -537,6 +490,7 @@ final class ComputerExecutor { } if (command != null) { + wasPaused = false; switch (command) { case TURN_ON -> { if (isOn) return; @@ -553,23 +507,29 @@ final class ComputerExecutor { shutdown(); computer.turnOn(); } - case ABORT -> { + case ABORT_WITH_TIMEOUT -> { if (!isOn) return; displayFailure("Error running computer", TimeoutState.ABORT_MESSAGE); shutdown(); } - case ERROR -> { + case ABORT_WITH_ERROR -> { if (!isOn) return; displayFailure("Error running computer", "An internal error occurred, see logs."); shutdown(); } } + } else if (wasPaused) { + executor.setRemainingTime(timeRemaining); + resumeMachine(null, null); } else if (event != null) { + executor.setRemainingTime(TimeoutState.TIMEOUT); resumeMachine(event.name, event.args); } } - void printState(StringBuilder out) { + @Override + @SuppressWarnings("GuardedBy") + public void writeState(StringBuilder out) { out.append("Enqueued command: ").append(command).append('\n'); out.append("Enqueued events: ").append(eventQueue.size()).append('\n'); @@ -600,19 +560,23 @@ final class ComputerExecutor { private void resumeMachine(@Nullable String event, @Nullable Object[] args) throws InterruptedException { var result = Nullability.assertNonNull(machine).handleEvent(event, args); - interruptedEvent = result.isPause(); - if (!result.isError()) return; - - displayFailure("Error running computer", result.getMessage()); - shutdown(); + if (result.isError()) { + displayFailure("Error running computer", result.getMessage()); + shutdown(); + } else if (result.isPause()) { + wasPaused = true; + timeRemaining = executor.getRemainingTime(); + } else { + wasPaused = false; + } } private enum StateCommand { TURN_ON, SHUTDOWN, REBOOT, - ABORT, - ERROR, + ABORT_WITH_TIMEOUT, + ABORT_WITH_ERROR, } private record Event(String name, @Nullable Object[] args) { diff --git a/projects/core/src/main/java/dan200/computercraft/core/computer/TimeoutState.java b/projects/core/src/main/java/dan200/computercraft/core/computer/TimeoutState.java index c39f23cbd..147c32e61 100644 --- a/projects/core/src/main/java/dan200/computercraft/core/computer/TimeoutState.java +++ b/projects/core/src/main/java/dan200/computercraft/core/computer/TimeoutState.java @@ -5,7 +5,8 @@ package dan200.computercraft.core.computer; import com.google.errorprone.annotations.concurrent.GuardedBy; -import dan200.computercraft.core.computer.computerthread.ComputerThread; +import dan200.computercraft.core.computer.computerthread.ComputerScheduler; +import dan200.computercraft.core.computer.computerthread.ManagedTimeoutState; import dan200.computercraft.core.lua.ILuaMachine; import dan200.computercraft.core.lua.MachineResult; @@ -25,90 +26,54 @@ import java.util.concurrent.TimeUnit; * (namely, throwing a "Too long without yielding" error). *

* Now, if a computer still does not stop after that period, they're behaving really badly. 1.5 seconds after a soft - * abort ({@link #ABORT_TIMEOUT}), we trigger a hard abort (note, this is done from the computer thread manager). This - * will destroy the entire Lua runtime and shut the computer down. + * abort ({@link #ABORT_TIMEOUT}), we trigger a hard abort. This will destroy the entire Lua runtime and shut the + * computer down. *

* The Lua runtime is also allowed to pause execution if there are other computers contesting for work. All computers - * are allowed to run for {@link ComputerThread#scaledPeriod()} nanoseconds (see {@link #currentDeadline}). After that - * period, if any computers are waiting to be executed then we'll set the paused flag to true ({@link #isPaused()}. + * are guaranteed to run for some time. After that period, if any computers are waiting to be executed then we'll set + * the paused flag to true ({@link #isPaused()}. * - * @see ComputerThread + * @see ComputerScheduler + * @see ManagedTimeoutState * @see ILuaMachine * @see MachineResult#isPause() */ -public final class TimeoutState { +public abstract class TimeoutState { /** - * The total time a task is allowed to run before aborting in nanoseconds. + * The time (in nanoseconds) are computer is allowed to run for its long-running tasks, such as startup and + * shutdown. */ - static final long TIMEOUT = TimeUnit.MILLISECONDS.toNanos(7000); + public static final long BASE_TIMEOUT = TimeUnit.SECONDS.toNanos(30); + + /** + * The total time the Lua VM is allowed to run before aborting in nanoseconds. + */ + public static final long TIMEOUT = TimeUnit.MILLISECONDS.toNanos(7000); /** * The time the task is allowed to run after each abort in nanoseconds. */ - static final long ABORT_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(1500); + public static final long ABORT_TIMEOUT = TimeUnit.MILLISECONDS.toNanos(1500); /** * The error message to display when we trigger an abort. */ public static final String ABORT_MESSAGE = "Too long without yielding"; - private final ComputerThread scheduler; @GuardedBy("this") private final List listeners = new ArrayList<>(0); - private boolean paused; - private boolean softAbort; - private volatile boolean hardAbort; - - /** - * When the cumulative time would have started had the whole event been processed in one go. - */ - private long cumulativeStart; - - /** - * How much cumulative time has elapsed. This is effectively {@code cumulativeStart - currentStart}. - */ - private long cumulativeElapsed; - - /** - * When this execution round started. - */ - private long currentStart; - - /** - * When this execution round should look potentially be paused. - */ - private long currentDeadline; - - public TimeoutState(ComputerThread scheduler) { - this.scheduler = scheduler; - } - - long nanoCumulative() { - return System.nanoTime() - cumulativeStart; - } - - long nanoCurrent() { - return System.nanoTime() - currentStart; - } + protected boolean paused; + protected boolean softAbort; + protected volatile boolean hardAbort; /** * Recompute the {@link #isSoftAborted()} and {@link #isPaused()} flags. + *

+ * Normally this will be called automatically by the {@link ComputerScheduler}, but it may be useful to call this + * manually if the most up-to-date information is needed. */ - public synchronized void refresh() { - // Important: The weird arithmetic here is important, as nanoTime may return negative values, and so we - // need to handle overflow. - var now = System.nanoTime(); - var changed = false; - if (!paused && (paused = currentDeadline - now <= 0 && scheduler.hasPendingWork())) { // now >= currentDeadline - changed = true; - } - if (!softAbort && (softAbort = now - cumulativeStart - TIMEOUT >= 0)) { // now - cumulativeStart >= TIMEOUT - changed = true; - } - - if (changed) updateListeners(); - } + public abstract void refresh(); /** * Whether we should pause execution of this machine. @@ -118,7 +83,7 @@ public final class TimeoutState { * * @return Whether we should pause execution. */ - public boolean isPaused() { + public final boolean isPaused() { return paused; } @@ -127,7 +92,7 @@ public final class TimeoutState { * * @return {@code true} if we should throw a timeout error. */ - public boolean isSoftAborted() { + public final boolean isSoftAborted() { return softAbort; } @@ -136,64 +101,22 @@ public final class TimeoutState { * * @return {@code true} if the machine should be forcibly shut down. */ - public boolean isHardAborted() { + public final boolean isHardAborted() { return hardAbort; } - /** - * If the machine should be forcibly aborted. - */ - void hardAbort() { - softAbort = hardAbort = true; - synchronized (this) { - updateListeners(); - } - } - - /** - * Start the current and cumulative timers again. - */ - void startTimer() { - var now = System.nanoTime(); - currentStart = now; - currentDeadline = now + scheduler.scaledPeriod(); - // Compute the "nominal start time". - cumulativeStart = now - cumulativeElapsed; - } - - /** - * Pauses the cumulative time, to be resumed by {@link #startTimer()}. - * - * @see #nanoCumulative() - */ - synchronized void pauseTimer() { - // We set the cumulative time to difference between current time and "nominal start time". - cumulativeElapsed = System.nanoTime() - cumulativeStart; - paused = false; - updateListeners(); - } - - /** - * Resets the cumulative time and resets the abort flags. - */ - synchronized void stopTimer() { - cumulativeElapsed = 0; - paused = softAbort = hardAbort = false; - updateListeners(); - } - @GuardedBy("this") - private void updateListeners() { + protected final void updateListeners() { for (var listener : listeners) listener.run(); } - public synchronized void addListener(Runnable listener) { + public final synchronized void addListener(Runnable listener) { Objects.requireNonNull(listener, "listener cannot be null"); listeners.add(listener); listener.run(); } - public synchronized void removeListener(Runnable listener) { + public final synchronized void removeListener(Runnable listener) { Objects.requireNonNull(listener, "listener cannot be null"); listeners.remove(listener); } diff --git a/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ComputerScheduler.java b/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ComputerScheduler.java new file mode 100644 index 000000000..3c5ee827a --- /dev/null +++ b/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ComputerScheduler.java @@ -0,0 +1,127 @@ +// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers +// +// SPDX-License-Identifier: MPL-2.0 + +package dan200.computercraft.core.computer.computerthread; + +import dan200.computercraft.core.computer.TimeoutState; +import dan200.computercraft.core.metrics.MetricsObserver; + +import java.util.concurrent.TimeUnit; + +/** + * The {@link ComputerScheduler} is responsible for executing computers on the computer thread(s). + *

+ * This handles both scheduling the computers for work across multiple threads, as well as {@linkplain TimeoutState timing out} + * or pausing the computer if they execute for too long. + *

+ * This API is composed of two interfaces, a {@link Worker} and {@link Executor}. The {@link ComputerScheduler} + * implementation will supply an {@link Executor}, while consuming classes should implement {@link Worker}. + *

+ * In practice, this interface is only implemented by {@link ComputerThread} (and consumed by {@link dan200.computercraft.core.computer.ComputerExecutor}), + * however this interface is useful to enforce separation of the two. + * + * @see ManagedTimeoutState + */ +public interface ComputerScheduler { + Executor createExecutor(Worker worker, MetricsObserver metrics); + + boolean stop(long timeout, TimeUnit unit) throws InterruptedException; + + /** + * The {@link Executor} holds the state of a {@link Worker} within the scheduler. + *

+ * This is used to schedule the worker for execution, as well as providing some additional control over the + * {@link TimeoutState}. + */ + interface Executor { + /** + * Submit the executor to the scheduler, marking it as ready {@linkplain Worker#work() to run some work}. + *

+ * This function is idempotent - if the executor is already queued, nothing will happen. + */ + void submit(); + + /** + * Get the executor's {@link TimeoutState}. + * + * @return The executor's timeout state. + */ + TimeoutState timeoutState(); + + /** + * Get the amount of time this computer can run for before being interrupted. + *

+ * This value starts off as {@link TimeoutState#BASE_TIMEOUT}, but may be reduced by + * {@link #setRemainingTime(long)}. + * + * @return The time this computer can run for being interrupted. + * @see #getRemainingTime() + */ + long getRemainingTime(); + + /** + * Set the amount of this computer can execute for before being interrupted. + *

+ * This value will typically be {@link TimeoutState#TIMEOUT}, but may be a previous value of + * {@link #getRemainingTime()} if the computer is resuming after {@linkplain TimeoutState#isPaused() being + * paused}. + * + * @param time The time this computer can execute for. + * @see #getRemainingTime() + */ + void setRemainingTime(long time); + } + + /** + * A {@link Worker} is responsible for actually running the computer's code. + *

+ * his handles {@linkplain Worker#work() running the actual computer logic}, as well as providing some additional + * control methods. + *

+ * This should be implemented by the consuming class. + */ + interface Worker { + /** + * Perform any work that the computer needs to do, for instance turning on, shutting down or actually running + * code. + *

+ * If the computer needs to run immediately again, it should call {@link Executor#submit()} within this method. + * + * @throws InterruptedException If the computer has run for too long and must be terminated. + */ + void work() throws InterruptedException; + + /** + * Get the ID of this computer, used in error messages. + * + * @return This computers ID. + */ + int getComputerID(); + + /** + * Write any useful debugging information computer to the provided buffer. This is used in log messages when the + * computer has run for too long. + * + * @param output The buffer to write to. + */ + void writeState(StringBuilder output); + + /** + * Abort this whole computer due to a timeout. + */ + void abortWithTimeout(); + + /** + * Abort this whole computer due to some internal error. + */ + void abortWithError(); + + /** + * "Unload" this computer, shutting it down and preventing it from running again. + *

+ * This is called by the scheduler when {@linkplain ComputerScheduler#stop(long, TimeUnit) it is stopped.} + */ + void unload(); + } +} diff --git a/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ComputerThread.java b/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ComputerThread.java index 17c0654c3..7f7821fd8 100644 --- a/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ComputerThread.java +++ b/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ComputerThread.java @@ -5,25 +5,23 @@ package dan200.computercraft.core.computer.computerthread; import com.google.common.annotations.VisibleForTesting; -import com.google.errorprone.annotations.concurrent.GuardedBy; import dan200.computercraft.core.ComputerContext; import dan200.computercraft.core.Logging; -import dan200.computercraft.core.computer.ComputerExecutor; import dan200.computercraft.core.computer.TimeoutState; +import dan200.computercraft.core.metrics.Metrics; +import dan200.computercraft.core.metrics.MetricsObserver; import dan200.computercraft.core.util.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; import java.util.Objects; import java.util.TreeSet; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; @@ -31,9 +29,9 @@ import java.util.concurrent.locks.ReentrantLock; /** * Runs all scheduled tasks for computers in a {@link ComputerContext}. *

- * This acts as an over-complicated {@link ThreadPoolExecutor}: It creates several {@link Worker} threads which pull - * tasks from a shared queue, executing them. It also creates a single {@link Monitor} thread, which updates computer - * timeouts, killing workers if they have not been terminated by {@link TimeoutState#isSoftAborted()}. + * This acts as an over-complicated {@link ThreadPoolExecutor}: It creates several {@linkplain WorkerThread worker + * threads} which pull tasks from a shared queue, executing them. It also creates a single {@link Monitor} thread, which + * updates computer timeouts, killing workers if they have not been terminated by {@link TimeoutState#isSoftAborted()}. *

* Computers are executed using a priority system, with those who have spent less time executing having a higher * priority than those hogging the thread. This, combined with {@link TimeoutState#isPaused()} means we can reduce the @@ -41,20 +39,16 @@ import java.util.concurrent.locks.ReentrantLock; *

* This is done using an implementation of Linux's Completely Fair Scheduler. When a computer executes, we compute what * share of execution time it has used (time executed/number of tasks). We then pick the computer who has the least - * "virtual execution time" (aka {@link ComputerExecutor#virtualRuntime}). + * "virtual execution time" (aka {@link ExecutorImpl#virtualRuntime}). *

* When adding a computer to the queue, we make sure its "virtual runtime" is at least as big as the smallest runtime. * This means that adding computers which have slept a lot do not then have massive priority over everyone else. See - * {@link #queue(ComputerExecutor)} for how this is implemented. + * {@link #queue(ExecutorImpl)} for how this is implemented. *

* In reality, it's unlikely that more than a few computers are waiting to execute at once, so this will not have much * effect unless you have a computer hogging execution time. However, it is pretty effective in those situations. - * - * @see TimeoutState For how hard timeouts are handled. - * @see ComputerExecutor For how computers actually do execution. */ -@SuppressWarnings("GuardedBy") // FIXME: Hard to know what the correct thing to do is. -public final class ComputerThread { +public final class ComputerThread implements ComputerScheduler { private static final Logger LOG = LoggerFactory.getLogger(ComputerThread.class); /** @@ -98,7 +92,7 @@ public final class ComputerThread { /** * Time difference between reporting crashed threads. * - * @see Worker#reportTimeout(ComputerExecutor, long) + * @see WorkerThread#reportTimeout(ExecutorImpl, long) */ private static final long REPORT_DEBOUNCE = TimeUnit.SECONDS.toNanos(1); @@ -125,7 +119,7 @@ public final class ComputerThread { * The array of current workers, and their owning threads. */ @GuardedBy("threadLock") - private final Worker[] workers; + private final WorkerThread[] workers; /** * The number of workers in {@link #workers}. @@ -139,29 +133,33 @@ public final class ComputerThread { private final long minPeriod; private final ReentrantLock computerLock = new ReentrantLock(); - private final Condition workerWakeup = computerLock.newCondition(); - private final Condition monitorWakeup = computerLock.newCondition(); + private final @GuardedBy("computerLock") Condition workerWakeup = computerLock.newCondition(); + private final @GuardedBy("computerLock") Condition monitorWakeup = computerLock.newCondition(); private final AtomicInteger idleWorkers = new AtomicInteger(0); /** * Active queues to execute. */ - private final TreeSet computerQueue = new TreeSet<>((a, b) -> { + @GuardedBy("computerLock") + private final TreeSet computerQueue = new TreeSet<>(ComputerThread::compareExecutors); + + @SuppressWarnings("GuardedBy") + private static int compareExecutors(ExecutorImpl a, ExecutorImpl b) { if (a == b) return 0; // Should never happen, but let's be consistent here long at = a.virtualRuntime, bt = b.virtualRuntime; if (at == bt) return Integer.compare(a.hashCode(), b.hashCode()); return at < bt ? -1 : 1; - }); + } /** - * The minimum {@link ComputerExecutor#virtualRuntime} time on the tree. + * The minimum {@link ExecutorImpl#virtualRuntime} time on the tree. */ private long minimumVirtualRuntime = 0; public ComputerThread(int threadCount) { - workers = new Worker[threadCount]; + workers = new WorkerThread[threadCount]; // latency and minPeriod are scaled by 1 + floor(log2(threads)). We can afford to execute tasks for // longer when executing on more than one thread. @@ -170,13 +168,28 @@ public final class ComputerThread { minPeriod = DEFAULT_MIN_PERIOD * factor; } + @Override + public Executor createExecutor(ComputerScheduler.Worker worker, MetricsObserver metrics) { + return new ExecutorImpl(worker, metrics); + } + @GuardedBy("threadLock") private void addWorker(int index) { LOG.trace("Spawning new worker {}.", index); - (workers[index] = new Worker(index)).owner.start(); + (workers[index] = new WorkerThread(index)).owner.start(); workerCount++; } + @SuppressWarnings("GuardedBy") + private int workerCount() { + return workerCount; + } + + @SuppressWarnings("GuardedBy") + private WorkerThread[] workersReadOnly() { + return workers; + } + /** * Ensure sufficient workers are running. */ @@ -184,7 +197,7 @@ public final class ComputerThread { private void ensureRunning() { // Don't even enter the lock if we've a monitor and don't need to/can't spawn an additional worker. // We'll be holding the computer lock at this point, so there's no problems with idleWorkers being wrong. - if (monitor != null && (idleWorkers.get() > 0 || workerCount == workers.length)) return; + if (monitor != null && (idleWorkers.get() > 0 || workerCount() == workersReadOnly().length)) return; threadLock.lock(); try { @@ -219,6 +232,7 @@ public final class ComputerThread { * @return Whether the thread was successfully shut down. * @throws InterruptedException If interrupted while waiting. */ + @Override public boolean stop(long timeout, TimeUnit unit) throws InterruptedException { advanceState(STOPPING); @@ -271,12 +285,12 @@ public final class ComputerThread { /** * Mark a computer as having work, enqueuing it on the thread. *

- * You must be holding {@link ComputerExecutor}'s {@code queueLock} when calling this method - it should only + * You must be holding {@link ExecutorImpl}'s {@code queueLock} when calling this method - it should only * be called from {@code enqueue}. * * @param executor The computer to execute work on. */ - void queue(ComputerExecutor executor) { + void queue(ExecutorImpl executor) { computerLock.lock(); try { if (state.get() != RUNNING) throw new IllegalStateException("ComputerThread is no longer running"); @@ -284,9 +298,6 @@ public final class ComputerThread { // Ensure we've got a worker running. ensureRunning(); - if (executor.onComputerQueue) throw new IllegalStateException("Cannot queue already queued executor"); - executor.onComputerQueue = true; - updateRuntimes(null); // We're not currently on the queue, so update its current execution time to @@ -318,14 +329,15 @@ public final class ComputerThread { /** - * Update the {@link ComputerExecutor#virtualRuntime}s of all running tasks, and then update the + * Update the {@link ExecutorImpl#virtualRuntime}s of all running tasks, and then update the * {@link #minimumVirtualRuntime} based on the current tasks. *

* This is called before queueing tasks, to ensure that {@link #minimumVirtualRuntime} is up-to-date. * * @param current The machine which we updating runtimes from. */ - private void updateRuntimes(@Nullable ComputerExecutor current) { + @GuardedBy("computerLock") + private void updateRuntimes(@Nullable ExecutorImpl current) { var minRuntime = Long.MAX_VALUE; // If we've a task on the queue, use that as our base time. @@ -334,7 +346,7 @@ public final class ComputerThread { // Update all the currently executing tasks var now = System.nanoTime(); var tasks = 1 + computerQueue.size(); - for (@Nullable var runner : workers) { + for (@Nullable var runner : workersReadOnly()) { if (runner == null) continue; var executor = runner.currentExecutor.get(); if (executor == null) continue; @@ -359,22 +371,9 @@ public final class ComputerThread { * Ensure the "currently working" state of the executor is reset, the timings are updated, and then requeue the * executor if needed. * - * @param runner The runner this task was on. * @param executor The executor to requeue */ - private void afterWork(Worker runner, ComputerExecutor executor) { - // Clear the executor's thread. - var currentThread = executor.executingThread.getAndSet(null); - if (currentThread != runner.owner) { - - LOG.error( - "Expected computer #{} to be running on {}, but already running on {}. This is a SERIOUS bug, please report with your debug.log.", - executor.getComputer().getID(), - runner.owner.getName(), - currentThread == null ? "nothing" : currentThread.getName() - ); - } - + private void afterWork(ExecutorImpl executor) { computerLock.lock(); try { updateRuntimes(executor); @@ -390,6 +389,13 @@ public final class ComputerThread { } } + @SuppressWarnings("GuardedBy") + private int computerQueueSize() { + // FIXME: We access this on other threads (in TimeoutState), so their reads won't be consistent. This isn't + // "critical" behaviour, so not clear if it matters too much. + return computerQueue.size(); + } + /** * The scaled period for a single task. * @@ -399,11 +405,8 @@ public final class ComputerThread { * @see #LATENCY_MAX_TASKS */ long scaledPeriod() { - // FIXME: We access this on other threads (in TimeoutState), so their reads won't be consistent. This isn't - // "critical" behaviour, so not clear if it matters too much. - // +1 to include the current task - var count = 1 + computerQueue.size(); + var count = 1 + computerQueueSize(); return count < LATENCY_MAX_TASKS ? latency / count : minPeriod; } @@ -413,9 +416,8 @@ public final class ComputerThread { * @return If we have work queued up. */ @VisibleForTesting - public boolean hasPendingWork() { - // FIXME: See comment in scaledPeriod. Again, we access this in multiple threads but not clear if it matters! - return !computerQueue.isEmpty(); + boolean hasPendingWork() { + return computerQueueSize() > 0; } /** @@ -424,12 +426,11 @@ public final class ComputerThread { * * @return If the computer threads are busy. */ - @GuardedBy("computerLock") private boolean isBusy() { - return computerQueue.size() > idleWorkers.get(); + return computerQueueSize() > idleWorkers.get(); } - private void workerFinished(Worker worker) { + private void workerFinished(WorkerThread worker) { // We should only shut down a worker once! This should only happen if we fail to abort a worker and then the // worker finishes normally. if (!worker.running.getAndSet(false)) return; @@ -444,6 +445,7 @@ public final class ComputerThread { workerCount--; if (workers[worker.index] != worker) { + assert false : "workerFinished but inconsistent worker"; LOG.error("Worker {} closed, but new runner has been spawned.", worker.index); } else if (state.get() == RUNNING || (state.get() == STOPPING && hasPendingWork())) { addWorker(worker.index); @@ -457,7 +459,7 @@ public final class ComputerThread { } /** - * Observes all currently active {@link Worker}s and terminates their tasks once they have exceeded the hard + * Observes all currently active {@link WorkerThread}s and terminates their tasks once they have exceeded the hard * abort limit. * * @see TimeoutState @@ -493,7 +495,7 @@ public final class ComputerThread { } private void checkRunners() { - for (@Nullable var runner : workers) { + for (@Nullable var runner : workersReadOnly()) { if (runner == null) continue; // If the worker has no work, skip @@ -505,25 +507,28 @@ public final class ComputerThread { // If we're still within normal execution times (TIMEOUT) or soft abort (ABORT_TIMEOUT), // then we can let the Lua machine do its work. - var afterStart = executor.timeout.nanoCumulative(); - var afterHardAbort = afterStart - TimeoutState.TIMEOUT - TimeoutState.ABORT_TIMEOUT; + var remainingTime = executor.timeout.getRemainingTime(); + // If remainingTime > 0, then we're executing normally, + // If remainingTime > -ABORT_TIMEOUT, then we've soft aborted. + // Otherwise, remainingTime <= -ABORT_TIMEOUT, and we've run over by -ABORT_TIMEOUT - remainingTime. + var afterHardAbort = -remainingTime - TimeoutState.ABORT_TIMEOUT; if (afterHardAbort < 0) continue; // Set the hard abort flag. executor.timeout.hardAbort(); - executor.abort(); + executor.worker.abortWithTimeout(); if (afterHardAbort >= TimeoutState.ABORT_TIMEOUT * 2) { // If we've hard aborted and interrupted, and we're still not dead, then mark the worker // as dead, finish off the task, and spawn a new runner. - runner.reportTimeout(executor, afterStart); + runner.reportTimeout(executor, remainingTime); runner.owner.interrupt(); workerFinished(runner); } else if (afterHardAbort >= TimeoutState.ABORT_TIMEOUT) { // If we've hard aborted but we're still not dead, dump the stack trace and interrupt // the task. - runner.reportTimeout(executor, afterStart); + runner.reportTimeout(executor, remainingTime); runner.owner.interrupt(); } } @@ -533,11 +538,11 @@ public final class ComputerThread { /** * Pulls tasks from the {@link #computerQueue} queue and runs them. *

- * This is responsible for running the {@link ComputerExecutor#work()}, {@link ComputerExecutor#beforeWork()} and - * {@link ComputerExecutor#afterWork()} functions. Everything else is either handled by the executor, timeout - * state or monitor. + * This is responsible for running the {@link ComputerScheduler.Worker#work()}, {@link ExecutorImpl#beforeWork()} + * and {@link ExecutorImpl#afterWork()} functions. Everything else is either handled by the executor, + * timeout state or monitor. */ - private final class Worker implements Runnable { + private final class WorkerThread implements Runnable { /** * The index into the {@link #workers} array. */ @@ -552,21 +557,21 @@ public final class ComputerThread { * Whether this runner is currently executing. This may be set to false when this worker terminates, or when * we try to abandon a worker in the monitor * - * @see #workerFinished(Worker) + * @see #workerFinished(WorkerThread) */ final AtomicBoolean running = new AtomicBoolean(true); /** * The computer we're currently running. */ - final AtomicReference currentExecutor = new AtomicReference<>(null); + final AtomicReference currentExecutor = new AtomicReference<>(null); /** * The last time we reported a stack trace, used to avoid spamming the logs. */ AtomicLong lastReport = new AtomicLong(Long.MIN_VALUE); - Worker(int index) { + WorkerThread(int index) { this.index = index; owner = workerFactory.newThread(this); } @@ -581,10 +586,9 @@ public final class ComputerThread { } private void runImpl() { - tasks: while (running.get()) { // Wait for an active queue to execute - ComputerExecutor executor; + ExecutorImpl executor; computerLock.lock(); try { idleWorkers.getAndIncrement(); @@ -599,21 +603,18 @@ public final class ComputerThread { computerLock.unlock(); } - // If we're trying to executing some task on this computer while someone else is doing work, something - // is seriously wrong. - while (!executor.executingThread.compareAndSet(null, owner)) { - var existing = executor.executingThread.get(); - if (existing != null) { - LOG.error( - "Trying to run computer #{} on thread {}, but already running on {}. This is a SERIOUS bug, please report with your debug.log.", - executor.getComputer().getID(), owner.getName(), existing.getName() - ); - continue tasks; - } + // Mark this computer as executing. + if (!ExecutorImpl.STATE.compareAndSet(executor, ExecutorState.ON_QUEUE, ExecutorState.RUNNING)) { + assert false : "Running computer on the wrong thread"; + LOG.error( + "Trying to run computer #{} on thread {}, but already running on another thread. This is a SERIOUS " + + "bug, please report with your debug.log.", + executor.worker.getComputerID(), owner.getName() + ); } // If we're stopping, the only thing this executor should be doing is shutting down. - if (state.get() >= STOPPING) executor.queueStop(false, true); + if (state.get() >= STOPPING) executor.worker.unload(); // Reset the timers executor.beforeWork(); @@ -624,19 +625,19 @@ public final class ComputerThread { // Execute the task try { - executor.work(); + executor.worker.work(); } catch (Exception | LinkageError | VirtualMachineError e) { - LOG.error("Error running task on computer #" + executor.getComputer().getID(), e); + LOG.error("Error running task on computer #" + executor.worker.getComputerID(), e); // Tear down the computer immediately. There's no guarantee it's well-behaved from now on. - executor.fastFail(); + executor.worker.abortWithError(); } finally { var thisExecutor = currentExecutor.getAndSet(null); - if (thisExecutor != null) afterWork(this, executor); + if (thisExecutor != null) afterWork(executor); } } } - private void reportTimeout(ComputerExecutor executor, long time) { + private void reportTimeout(ExecutorImpl executor, long time) { if (!LOG.isErrorEnabled(Logging.COMPUTER_ERROR)) return; // Attempt to debounce stack trace reporting, limiting ourselves to one every second. There's no need to be @@ -649,8 +650,8 @@ public final class ComputerThread { var owner = Objects.requireNonNull(this.owner); var builder = new StringBuilder() - .append("Terminating computer #").append(executor.getComputer().getID()) - .append(" due to timeout (running for ").append(time * 1e-9) + .append("Terminating computer #").append(executor.worker.getComputerID()) + .append(" due to timeout (ran over by ").append(time * -1e-9) .append(" seconds). This is NOT a bug, but may mean a computer is misbehaving.\n") .append("Thread ") .append(owner.getName()) @@ -664,9 +665,150 @@ public final class ComputerThread { builder.append(" at ").append(element).append('\n'); } - executor.printState(builder); + executor.worker.writeState(builder); LOG.warn(builder.toString()); } } + + /** + * The current state of a {@link ExecutorState}. + *

+ * Executors are either enqueued (have more work to do) or not and working or not. This enum encapsulates the four + * combinations of these properties, with the following transitions: + * + *

{@code
+     *            submit()            afterWork()
+     *      IDLE ---------> ON_QUEUE <----------- REPEAT
+     *       ^                  |                   ^
+     *       |                  | runImpl()         |
+     *       |                  V                   |
+     *       +---------------RUNNING----------------+
+     *   afterWork()                  submit()
+     * }
+ */ + enum ExecutorState { + /** + * This executor is idle. + */ + IDLE, + + /** + * This executor is on the queue but idle. + */ + ON_QUEUE, + + /** + * This executor is running and will transition to idle after execution. + */ + RUNNING, + + /** + * This executor is running and should run again after this task finishes. + */ + REPEAT; + + ExecutorState enqueue() { + return switch (this) { + case IDLE, ON_QUEUE -> ON_QUEUE; + case RUNNING, REPEAT -> REPEAT; + }; + } + + ExecutorState requeue() { + return switch (this) { + case IDLE, ON_QUEUE -> { + assert false : "Impossible state after executing"; + LOG.error("Impossible state - calling requeue with {}.", this); + yield ExecutorState.ON_QUEUE; + } + case RUNNING -> ExecutorState.IDLE; + case REPEAT -> ExecutorState.ON_QUEUE; + }; + } + } + + private final class ExecutorImpl implements Executor { + public static final AtomicReferenceFieldUpdater STATE = AtomicReferenceFieldUpdater.newUpdater( + ExecutorImpl.class, ExecutorState.class, "$state" + ); + + final Worker worker; + private final MetricsObserver metrics; + final TimeoutImpl timeout; + + /** + * The current state of this worker. + */ + private volatile ExecutorState $state = ExecutorState.IDLE; + + /** + * The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers. + * + * @see ComputerThread + */ + long virtualRuntime = 0; + + /** + * The last time at which we updated {@link #virtualRuntime}. + * + * @see ComputerThread + */ + long vRuntimeStart; + + ExecutorImpl(Worker worker, MetricsObserver metrics) { + this.worker = worker; + this.metrics = metrics; + timeout = new TimeoutImpl(); + } + + /** + * Called before calling {@link Worker#work()}, setting up any important state. + */ + void beforeWork() { + vRuntimeStart = System.nanoTime(); + timeout.startTimer(scaledPeriod()); + } + + /** + * Called after executing {@link Worker#work()}. + * + * @return If we have more work to do. + */ + boolean afterWork() { + timeout.reset(); + metrics.observe(Metrics.COMPUTER_TASKS, timeout.getExecutionTime()); + + var state = STATE.getAndUpdate(this, ExecutorState::requeue); + return state == ExecutorState.REPEAT; + } + + @Override + public void submit() { + var state = STATE.getAndUpdate(this, ExecutorState::enqueue); + if (state == ExecutorState.IDLE) queue(this); + } + + @Override + public TimeoutState timeoutState() { + return timeout; + } + + @Override + public long getRemainingTime() { + return timeout.getRemainingTime(); + } + + @Override + public void setRemainingTime(long time) { + timeout.setRemainingTime(time); + } + } + + private final class TimeoutImpl extends ManagedTimeoutState { + @Override + protected boolean shouldPause() { + return hasPendingWork(); + } + } } diff --git a/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ManagedTimeoutState.java b/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ManagedTimeoutState.java new file mode 100644 index 000000000..2f1c9d181 --- /dev/null +++ b/projects/core/src/main/java/dan200/computercraft/core/computer/computerthread/ManagedTimeoutState.java @@ -0,0 +1,127 @@ +// SPDX-FileCopyrightText: 2019 The CC: Tweaked Developers +// +// SPDX-License-Identifier: MPL-2.0 + +package dan200.computercraft.core.computer.computerthread; + +import dan200.computercraft.core.computer.TimeoutState; + +/** + * A basic {@link TimeoutState} implementation, for use by {@link ComputerScheduler} implementations. + *

+ * While almost all {@link TimeoutState} implementations will be derived from this class, the two are intentionally kept + * separate. This class is intended for the {@link ComputerScheduler} (which is responsible for controlling the + * timeout), and not for the main computer logic, which only needs to check timeout flags. + *

+ * This class tracks the time a computer was started (and thus {@linkplain #getExecutionTime()} how long it has been + * running for), as well as the deadline for when a computer should be soft aborted and paused. + */ +public abstract class ManagedTimeoutState extends TimeoutState { + /** + * When execution of this computer started. + * + * @see #getExecutionTime() + */ + private long startTime; + + /** + * The time when this computer should be aborted. + * + * @see #getRemainingTime() + * @see #setRemainingTime(long) + */ + private long abortDeadline; + + /** + * The time when this computer should be paused if {@link ComputerThread#hasPendingWork()} is set. + */ + private long pauseDeadline; + + @Override + public final synchronized void refresh() { + // Important: The weird arithmetic here is important, as nanoTime may return negative values, and so we + // need to handle overflow. + var now = System.nanoTime(); + var changed = false; + if (!paused && Long.compareUnsigned(now, pauseDeadline) >= 0 && shouldPause()) { // now >= currentDeadline + paused = true; + changed = true; + } + if (!softAbort && Long.compareUnsigned(now, abortDeadline) >= 0) { // now >= currentAbort + softAbort = true; + changed = true; + } + if (softAbort && !hardAbort && Long.compareUnsigned(now, abortDeadline + ABORT_TIMEOUT) >= 0) { // now >= currentAbort + ABORT_TIMEOUT. + hardAbort = true; + changed = true; + } + + if (changed) updateListeners(); + } + + /** + * Get how long this computer has been executing for. + * + * @return How long the computer has been running for in nanoseconds. + */ + public final long getExecutionTime() { + return System.nanoTime() - startTime; + } + + /** + * Get how long this computer is permitted to run before being aborted. + * + * @return The remaining time, in nanoseconds. + * @see ComputerScheduler.Executor#getRemainingTime() + */ + public final long getRemainingTime() { + return abortDeadline - System.nanoTime(); + } + + /** + * Set how long this computer is permitted to run before being aborted. + * + * @param time The remaining time, in nanoseconds. + * @see ComputerScheduler.Executor#setRemainingTime(long) + */ + public final void setRemainingTime(long time) { + abortDeadline = startTime + time; + } + + /** + * Set the hard-abort flag immediately. + */ + public final void hardAbort() { + softAbort = hardAbort = true; + synchronized (this) { + updateListeners(); + } + } + + /** + * Start this timer, recording the current start time, and deadline before a computer may be paused. + * + * @param pauseTimeout The minimum time this computer can run before potentially being paused. + */ + public final synchronized void startTimer(long pauseTimeout) { + var now = System.nanoTime(); + startTime = now; + abortDeadline = now + BASE_TIMEOUT; + pauseDeadline = now + pauseTimeout; + } + + /** + * Clear the paused and abort flags. + */ + public final synchronized void reset() { + paused = softAbort = hardAbort = false; + updateListeners(); + } + + /** + * Determine if this computer should be paused, as other computers are contending for work. + * + * @return If this computer should be paused. + */ + protected abstract boolean shouldPause(); +} diff --git a/projects/core/src/test/java/dan200/computercraft/core/computer/computerthread/ComputerThreadRunner.java b/projects/core/src/test/java/dan200/computercraft/core/computer/computerthread/ComputerThreadRunner.java new file mode 100644 index 000000000..4106e85e0 --- /dev/null +++ b/projects/core/src/test/java/dan200/computercraft/core/computer/computerthread/ComputerThreadRunner.java @@ -0,0 +1,142 @@ +// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers +// +// SPDX-License-Identifier: MPL-2.0 + +package dan200.computercraft.core.computer.computerthread; + +import dan200.computercraft.core.computer.TimeoutState; +import dan200.computercraft.core.metrics.MetricsObserver; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +import javax.annotation.concurrent.GuardedBy; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.BiConsumer; + +public class ComputerThreadRunner implements AutoCloseable { + private final ComputerThread thread; + + private final Lock errorLock = new ReentrantLock(); + private final @GuardedBy("errorLock") Condition hasError = errorLock.newCondition(); + @GuardedBy("errorLock") + private @MonotonicNonNull Throwable error = null; + + public ComputerThreadRunner() { + this.thread = new ComputerThread(1); + } + + public ComputerThread thread() { + return thread; + } + + @Override + public void close() { + try { + if (!thread.stop(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("Failed to shutdown ComputerContext in time."); + } + } catch (InterruptedException e) { + throw new IllegalStateException("Runtime thread was interrupted", e); + } + } + + public Worker createWorker(BiConsumer action) { + return new Worker(thread, e -> action.accept(e, e.timeoutState())); + } + + public void createLoopingComputer() { + new Worker(thread, e -> { + Thread.sleep(100); + e.submit(); + }).executor().submit(); + } + + public void startAndWait(Worker worker) throws Exception { + worker.executor().submit(); + do { + errorLock.lock(); + try { + rethrowIfNeeded(); + if (hasError.await(100, TimeUnit.MILLISECONDS)) rethrowIfNeeded(); + } finally { + errorLock.unlock(); + } + } while (worker.executed == 0 || thread.hasPendingWork()); + } + + @GuardedBy("errorLock") + private void rethrowIfNeeded() throws Exception { + if (error != null) ComputerThreadRunner.throwUnchecked0(error); + } + + @SuppressWarnings("unchecked") + private static void throwUnchecked0(Throwable t) throws T { + throw (T) t; + } + + @FunctionalInterface + private interface Task { + void run(ComputerScheduler.Executor executor) throws InterruptedException; + } + + public final class Worker implements ComputerScheduler.Worker { + private final Task run; + private final ComputerScheduler.Executor executor; + volatile int executed = 0; + + private Worker(ComputerScheduler scheduler, Task run) { + this.run = run; + this.executor = scheduler.createExecutor(this, MetricsObserver.discard()); + } + + public ComputerScheduler.Executor executor() { + return executor; + } + + @Override + public void work() { + try { + run.run(executor); + executed++; + } catch (Throwable e) { + errorLock.lock(); + try { + if (error == null) { + error = e; + hasError.signal(); + } else { + error.addSuppressed(e); + } + } finally { + errorLock.unlock(); + } + + if (e instanceof Exception || e instanceof AssertionError) return; + throwUnchecked0(e); + } + } + + @Override + public int getComputerID() { + return 0; + } + + @Override + public void writeState(StringBuilder output) { + } + + @Override + public void abortWithTimeout() { + } + + @Override + public void unload() { + } + + @Override + public void abortWithError() { + } + } +} diff --git a/projects/core/src/test/java/dan200/computercraft/core/computer/computerthread/ComputerThreadTest.java b/projects/core/src/test/java/dan200/computercraft/core/computer/computerthread/ComputerThreadTest.java index c29b130c1..08b3933d4 100644 --- a/projects/core/src/test/java/dan200/computercraft/core/computer/computerthread/ComputerThreadTest.java +++ b/projects/core/src/test/java/dan200/computercraft/core/computer/computerthread/ComputerThreadTest.java @@ -4,9 +4,8 @@ package dan200.computercraft.core.computer.computerthread; -import dan200.computercraft.core.lua.MachineResult; +import dan200.computercraft.core.computer.TimeoutState; import dan200.computercraft.test.core.ConcurrentHelpers; -import dan200.computercraft.test.core.computer.KotlinComputerManager; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,11 +27,11 @@ import static org.junit.jupiter.api.Assertions.*; @Execution(ExecutionMode.CONCURRENT) public class ComputerThreadTest { private static final Logger LOG = LoggerFactory.getLogger(ComputerThreadTest.class); - private KotlinComputerManager manager; + private ComputerThreadRunner manager; @BeforeEach public void before() { - manager = new KotlinComputerManager(); + manager = new ComputerThreadRunner(); } @AfterEach @@ -42,16 +41,13 @@ public class ComputerThreadTest { @Test public void testSoftAbort() throws Exception { - var computer = manager.create(); - manager.enqueue(computer, timeout -> { + var computer = manager.createWorker((executor, timeout) -> { + executor.setRemainingTime(TimeoutState.TIMEOUT); assertFalse(timeout.isSoftAborted(), "Should not start soft-aborted"); var delay = ConcurrentHelpers.waitUntil(timeout::isSoftAborted); assertThat("Should be soft aborted", delay * 1e-9, closeTo(7, 1.0)); LOG.info("Slept for {}", delay); - - computer.shutdown(); - return MachineResult.OK; }); manager.startAndWait(computer); @@ -59,15 +55,12 @@ public class ComputerThreadTest { @Test public void testHardAbort() throws Exception { - var computer = manager.create(); - manager.enqueue(computer, timeout -> { + var computer = manager.createWorker((executor, timeout) -> { + executor.setRemainingTime(TimeoutState.TIMEOUT); assertFalse(timeout.isHardAborted(), "Should not start soft-aborted"); assertThrows(InterruptedException.class, () -> Thread.sleep(11_000), "Sleep should be hard aborted"); assertTrue(timeout.isHardAborted(), "Thread should be hard aborted"); - - computer.shutdown(); - return MachineResult.OK; }); manager.startAndWait(computer); @@ -75,13 +68,9 @@ public class ComputerThreadTest { @Test public void testNoPauseIfNoOtherMachines() throws Exception { - var computer = manager.create(); - manager.enqueue(computer, timeout -> { + var computer = manager.createWorker((executor, timeout) -> { var didPause = ConcurrentHelpers.waitUntil(timeout::isPaused, 5, TimeUnit.SECONDS); assertFalse(didPause, "Machine shouldn't have paused within 5s"); - - computer.shutdown(); - return MachineResult.OK; }); manager.startAndWait(computer); @@ -89,18 +78,14 @@ public class ComputerThreadTest { @Test public void testPauseIfSomeOtherMachine() throws Exception { - var computer = manager.create(); - manager.enqueue(computer, timeout -> { - var budget = manager.context().computerScheduler().scaledPeriod(); + var computer = manager.createWorker((executor, timeout) -> { + var budget = manager.thread().scaledPeriod(); assertEquals(budget, TimeUnit.MILLISECONDS.toNanos(25), "Budget should be 25ms"); var delay = ConcurrentHelpers.waitUntil(timeout::isPaused); // Linux appears to have much more accurate timing than Windows/OSX. Or at least on CI! var time = System.getProperty("os.name", "").toLowerCase(Locale.ROOT).contains("linux") ? 0.05 : 0.3; assertThat("Paused within a short time", delay * 1e-9, lessThanOrEqualTo(time)); - - computer.shutdown(); - return MachineResult.OK; }); manager.createLoopingComputer(); diff --git a/projects/core/src/testFixtures/kotlin/dan200/computercraft/test/core/computer/KotlinComputerManager.kt b/projects/core/src/testFixtures/kotlin/dan200/computercraft/test/core/computer/KotlinComputerManager.kt deleted file mode 100644 index 8a93930e1..000000000 --- a/projects/core/src/testFixtures/kotlin/dan200/computercraft/test/core/computer/KotlinComputerManager.kt +++ /dev/null @@ -1,189 +0,0 @@ -// SPDX-FileCopyrightText: 2022 The CC: Tweaked Developers -// -// SPDX-License-Identifier: MPL-2.0 - -package dan200.computercraft.test.core.computer - -import dan200.computercraft.api.lua.ILuaAPI -import dan200.computercraft.core.ComputerContext -import dan200.computercraft.core.computer.Computer -import dan200.computercraft.core.computer.TimeoutState -import dan200.computercraft.core.lua.MachineEnvironment -import dan200.computercraft.core.lua.MachineResult -import dan200.computercraft.core.terminal.Terminal -import java.util.* -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.Lock -import java.util.concurrent.locks.ReentrantLock - -typealias FakeComputerTask = (state: TimeoutState) -> MachineResult - -/** - * Creates "fake" computers, which just run user-defined tasks rather than Lua code. - */ -class KotlinComputerManager : AutoCloseable { - - private val machines: MutableMap> = HashMap() - private val context = ComputerContext.builder(BasicEnvironment()) - .luaFactory { env, _ -> DummyLuaMachine(env) } - .build() - private val errorLock: Lock = ReentrantLock() - private val hasError = errorLock.newCondition() - - @Volatile - private var error: Throwable? = null - override fun close() { - try { - context.ensureClosed(10, TimeUnit.SECONDS) - } catch (e: InterruptedException) { - throw IllegalStateException("Runtime thread was interrupted", e) - } - } - - fun context(): ComputerContext { - return context - } - - /** - * Create a new computer which pulls from our task queue. - * - * @return The computer. This will not be started yet, you must call [Computer.turnOn] and - * [Computer.tick] to do so. - */ - fun create(): Computer { - val queue: Queue = ConcurrentLinkedQueue() - val computer = Computer( - context, - BasicEnvironment(), - Terminal(51, 19, true), - 0, - ) - computer.addApi(QueuePassingAPI(queue)) // Inject an extra API to pass the queue to the machine. - machines[computer] = queue - return computer - } - - /** - * Create and start a new computer which loops forever. - */ - fun createLoopingComputer() { - val computer = create() - enqueueForever(computer) { - Thread.sleep(100) - MachineResult.OK - } - computer.turnOn() - computer.tick() - } - - /** - * Enqueue a task on a computer. - * - * @param computer The computer to enqueue the work on. - * @param task The task to run. - */ - fun enqueue(computer: Computer, task: FakeComputerTask) { - machines[computer]!!.offer(task) - } - - /** - * Enqueue a repeated task on a computer. This is automatically requeued when the task finishes, meaning the task - * queue is never empty. - * - * @param computer The computer to enqueue the work on. - * @param task The task to run. - */ - private fun enqueueForever(computer: Computer, task: FakeComputerTask) { - machines[computer]!!.offer { - val result = task(it) - enqueueForever(computer, task) - computer.queueEvent("some_event", null) - result - } - } - - /** - * Sleep for a given period, immediately propagating any exceptions thrown by a computer. - * - * @param delay The duration to sleep for. - * @param unit The time unit the duration is measured in. - * @throws Exception An exception thrown by a running computer. - */ - @Throws(Exception::class) - fun sleep(delay: Long, unit: TimeUnit?) { - errorLock.lock() - try { - rethrowIfNeeded() - if (hasError.await(delay, unit)) rethrowIfNeeded() - } finally { - errorLock.unlock() - } - } - - /** - * Start a computer and wait for it to finish. - * - * @param computer The computer to wait for. - * @throws Exception An exception thrown by a running computer. - */ - @Throws(Exception::class) - fun startAndWait(computer: Computer) { - computer.turnOn() - computer.tick() - do { - sleep(100, TimeUnit.MILLISECONDS) - } while (context.computerScheduler().hasPendingWork() || computer.isOn) - - rethrowIfNeeded() - } - - @Throws(Exception::class) - private fun rethrowIfNeeded() { - val error = error ?: return - throw error - } - - private class QueuePassingAPI constructor(val tasks: Queue) : ILuaAPI { - override fun getNames(): Array = arrayOf() - } - - private inner class DummyLuaMachine(private val environment: MachineEnvironment) : KotlinLuaMachine(environment) { - private val tasks: Queue = - environment.apis.asSequence().filterIsInstance(QueuePassingAPI::class.java).first().tasks - - override fun getTask(): (suspend KotlinLuaMachine.() -> Unit)? { - try { - val task = tasks.remove() - return { - try { - task(environment.timeout) - } catch (e: Throwable) { - reportError(e) - } - } - } catch (e: Throwable) { - reportError(e) - return null - } - } - - override fun close() {} - - private fun reportError(e: Throwable) { - errorLock.lock() - try { - if (error == null) { - error = e - hasError.signal() - } else { - error!!.addSuppressed(e) - } - } finally { - errorLock.unlock() - } - - if (e is Exception || e is AssertionError) return else throw e - } - } -} diff --git a/projects/web/src/main/java/dan200/computercraft/core/computer/computerthread/TComputerThread.java b/projects/web/src/main/java/dan200/computercraft/core/computer/computerthread/TComputerThread.java index 52254e603..211faed33 100644 --- a/projects/web/src/main/java/dan200/computercraft/core/computer/computerthread/TComputerThread.java +++ b/projects/web/src/main/java/dan200/computercraft/core/computer/computerthread/TComputerThread.java @@ -5,57 +5,110 @@ package dan200.computercraft.core.computer.computerthread; import cc.tweaked.web.js.Callbacks; +import dan200.computercraft.core.computer.TimeoutState; +import dan200.computercraft.core.metrics.MetricsObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.teavm.jso.browser.TimerHandler; import java.util.ArrayDeque; import java.util.concurrent.TimeUnit; /** - * A reimplementation of {@link ComputerThread} which, well, avoids any threading! + * An implementation of {@link ComputerScheduler} which executes work as soon as possible via + * {@link Callbacks#setImmediate(TimerHandler)}. *

- * This instead just exucutes work as soon as possible via {@link Callbacks#setImmediate(TimerHandler)}. Timeouts are - * instead handled via polling, see {@link cc.tweaked.web.builder.PatchCobalt}. + * Timeouts are instead handled via polling, see {@link cc.tweaked.web.builder.PatchCobalt}. + * + * @see ComputerThread */ -public class TComputerThread { - private static final ArrayDeque executors = new ArrayDeque<>(); - private final TimerHandler callback = this::workOnce; +public class TComputerThread implements ComputerScheduler { + private static final Logger LOG = LoggerFactory.getLogger(TComputerThread.class); + private static final long SCALED_PERIOD = 50 * 1_000_000L; + + private static final ArrayDeque executors = new ArrayDeque<>(); + private static final TimerHandler callback = TComputerThread::workOnce; public TComputerThread(int threads) { } - public void queue(ComputerExecutor executor) { - if (executor.onComputerQueue) throw new IllegalStateException("Cannot queue already queued executor"); - executor.onComputerQueue = true; - - if (executors.isEmpty()) Callbacks.setImmediate(callback); - executors.add(executor); + @Override + public Executor createExecutor(Worker worker, MetricsObserver metrics) { + return new ExecutorImpl(worker); } - private void workOnce() { + private static void workOnce() { var executor = executors.poll(); if (executor == null) throw new IllegalStateException("Working, but executor is null"); - if (!executor.onComputerQueue) throw new IllegalArgumentException("Working but not on queue"); executor.beforeWork(); try { - executor.work(); + executor.worker.work(); } catch (Exception e) { - e.printStackTrace(); + LOG.error("Error running computer", e); + executor.worker.abortWithError(); } + executor.afterWork(); - if (executor.afterWork()) executors.push(executor); if (!executors.isEmpty()) Callbacks.setImmediate(callback); } - public boolean hasPendingWork() { - return true; - } - - public long scaledPeriod() { - return 50 * 1_000_000L; - } - + @Override public boolean stop(long timeout, TimeUnit unit) { return true; } + + /** + * The {@link Executor} for our scheduler. + */ + private static final class ExecutorImpl implements ComputerScheduler.Executor { + final ComputerScheduler.Worker worker; + private final TimeoutImpl timeout = new TimeoutImpl(); + private boolean onQueue; + + private ExecutorImpl(Worker worker) { + this.worker = worker; + } + + @Override + public void submit() { + if (onQueue) return; + onQueue = true; + + if (executors.isEmpty()) Callbacks.setImmediate(callback); + executors.add(this); + } + + void beforeWork() { + if (!onQueue) throw new IllegalArgumentException("Working but not on queue"); + onQueue = false; + timeout.startTimer(SCALED_PERIOD); + } + + void afterWork() { + timeout.reset(); + } + + @Override + public TimeoutState timeoutState() { + return timeout; + } + + @Override + public long getRemainingTime() { + return timeout.getRemainingTime(); + } + + @Override + public void setRemainingTime(long time) { + timeout.setRemainingTime(time); + } + } + + private static final class TimeoutImpl extends ManagedTimeoutState { + @Override + protected boolean shouldPause() { + return true; + } + } }