1
0
mirror of https://github.com/SquidDev-CC/CC-Tweaked synced 2024-12-12 11:10:29 +00:00

An initial prototype of main thread rate limiting

Unlike ComputerThread, we do not have a single source of tasks, and so
need a smarter way to handle scheduling and rate limiting. This
introduces a cooldown system, which works on both a global and
per-computer level:

Each computer is allowed to do some work for 5ms. If they go over that
budget, then they are marked as "hot", and will not execute work on the
next tick, until they have cooled down. This ensures that _on average_
computers perform at most 5ms of work per tick.

Obviously this is a rather large time span, so we also apply a global
10ms to all computers. This uses the same cooldown principle, meaning we
keep to an average of 10ms, even if we go over budget.
This commit is contained in:
SquidDev 2019-03-19 12:24:36 +00:00
parent d0bf9e9cd7
commit 853e2622a1
9 changed files with 488 additions and 141 deletions

View File

@ -24,6 +24,7 @@ import dan200.computercraft.api.turtle.event.TurtleAction;
import dan200.computercraft.core.apis.AddressPredicate;
import dan200.computercraft.core.apis.ApiFactories;
import dan200.computercraft.core.apis.http.websocket.Websocket;
import dan200.computercraft.core.computer.MainThread;
import dan200.computercraft.core.filesystem.ComboMount;
import dan200.computercraft.core.filesystem.FileMount;
import dan200.computercraft.core.filesystem.JarMount;
@ -304,6 +305,7 @@ public class ComputerCraft
{
ComputerCraft.serverComputerRegistry.reset();
WirelessNetwork.resetNetworks();
MainThread.reset();
Tracking.reset();
}
}
@ -315,6 +317,7 @@ public class ComputerCraft
{
ComputerCraft.serverComputerRegistry.reset();
WirelessNetwork.resetNetworks();
MainThread.reset();
Tracking.reset();
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* <li>Keeps track of whether the computer is on and blinking.</li>
* <li>Monitors whether the computer's visible state (redstone, on/off/blinking) has changed.</li>
* <li>Passes commands and events to the {@link ComputerExecutor}.</li>
* <li>Passes main thread tasks to the {@link MainThreadExecutor}.</li>
* </ul>
*/
public class Computer
@ -39,6 +40,7 @@ public class Computer
private final IComputerEnvironment m_environment;
private final Terminal m_terminal;
private final ComputerExecutor executor;
private final MainThreadExecutor serverExecutor;
// Additional state about the computer and its environment.
private boolean m_blinking = false;
@ -55,6 +57,7 @@ public class Computer
m_terminal = terminal;
executor = new ComputerExecutor( this );
serverExecutor = new MainThreadExecutor( this );
}
IComputerEnvironment getComputerEnvironment()
@ -112,6 +115,43 @@ public class Computer
executor.queueEvent( event, args );
}
/**
* Queue a task to be run on the main thread, using {@link MainThread}.
*
* @param runnable The task to run
* @return If the task was successfully queued (namely, whether there is space on it).
*/
public boolean queueMainThread( Runnable runnable )
{
return serverExecutor.enqueue( runnable );
}
/**
* If this computer is allowed to execute work on the main thread.
*
* One only needs to use this if executing work outside of {@link #queueMainThread(Runnable)}.
*
* @return If we can execute work on the main thread this tick.
* @see #afterExecuteMainThread(long)
*/
public boolean canExecuteMainThread()
{
return MainThread.canExecute() && serverExecutor.canExecuteExternal();
}
/**
* Increment the time taken to execute work this tick.
*
* One only needs to use this if executing work outside of {@link #queueMainThread(Runnable)}.
*
* @param time The time, in nanoseconds.
* @see #canExecuteMainThread()
*/
public void afterExecuteMainThread( long time )
{
serverExecutor.afterExecuteExternal( time );
}
public int getID()
{
return m_id;

View File

@ -133,7 +133,7 @@ final class ComputerExecutor
*
* Note, this should be empty if this computer is off - it is cleared on shutdown and when turning on again.
*/
private final Queue<Event> eventQueue = new ArrayDeque<>();
private final Queue<Event> eventQueue = new ArrayDeque<>( 4 );
/**
* Whether we interrupted an event and so should resume it instead of executing another task.

View File

@ -112,7 +112,7 @@ public class ComputerThread
long at = a.virtualRuntime, bt = b.virtualRuntime;
if( at == bt ) return Integer.compare( a.hashCode(), b.hashCode() );
return Long.compare( at, bt );
return at < bt ? -1 : 1;
} );
/**

View File

@ -1,17 +0,0 @@
/*
* This file is part of ComputerCraft - http://www.computercraft.info
* Copyright Daniel Ratcliffe, 2011-2019. Do not distribute without permission.
* Send enquiries to dratcliffe@gmail.com
*/
package dan200.computercraft.core.computer;
import javax.annotation.Nonnull;
public interface ITask
{
@Nonnull
Computer getOwner();
void execute();
}

View File

@ -6,66 +6,181 @@
package dan200.computercraft.core.computer;
import dan200.computercraft.core.tracking.Tracking;
import dan200.computercraft.api.lua.ILuaTask;
import javax.annotation.Nonnull;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Runs tasks on the main (server) thread, ticks {@link MainThreadExecutor}s, and limits how much time is used this
* tick.
*
* Similar to {@link MainThreadExecutor}, the {@link MainThread} can be in one of three states: cool, hot and cooling.
* However, the implementation here is a little different:
*
* {@link MainThread} starts cool, and runs as many tasks as it can in the current {@link #budget}ns. Any external tasks
* (those run by tile entities, etc...) will also consume the budget
*
* Next tick, we put {@link #MAX_TICK_TIME} into our budget (and clamp it to that value to). If we're still over budget,
* then we should not execute <em>any</em> work (either as part of {@link MainThread} or externally).
*/
public class MainThread
{
private static final int MAX_TASKS_PER_TICK = 1000;
private static final int MAX_TASKS_TOTAL = 50000;
/**
* The maximum time that can be spent executing tasks in a single tick.
*
* Note, we will quite possibly go over this limit, as there's no way to tell how long a will take - this aims
* to be the upper bound of the <em>average</em> time.
*
* @see #budget
*/
private static final long MAX_TICK_TIME = TimeUnit.MILLISECONDS.toNanos( 10 );
private static final Queue<ITask> m_outstandingTasks = new ArrayDeque<>();
private static final Object m_nextUnusedTaskIDLock = new Object();
private static long m_nextUnusedTaskID = 0;
/**
* The ideal maximum time a computer can execute for in a tick.
*
* Note, we will quite possibly go over this limit, as there's no way to tell how long a task will take - this aims
* to be the upper bound of the <em>average</em> time.
*/
static final long MAX_COMPUTER_TIME = TimeUnit.MILLISECONDS.toNanos( 5 );
/**
* An internal counter for {@link ILuaTask} ids.
*
* @see dan200.computercraft.api.lua.ILuaContext#issueMainThreadTask(ILuaTask)
* @see #getUniqueTaskID()
*/
private static final AtomicLong lastTaskId = new AtomicLong();
/**
* The queue of {@link MainThreadExecutor}s with tasks to perform.
*/
private static final Queue<MainThreadExecutor> executors = new ArrayDeque<>();
/**
* The set of executors which went over budget in a previous tick, and are waiting for their time to run down.
*
* @see MainThreadExecutor#tickCooling()
* @see #cooling(MainThreadExecutor)
*/
private static final HashSet<MainThreadExecutor> cooling = new HashSet<>();
/**
* The current tick number. This is used by {@link MainThreadExecutor} to determine when to reset its own time
* counter.
*
* @see #currentTick()
*/
private static int currentTick;
/**
* The remaining budgeted time for this tick. This may be negative, in the case that we've gone over budget.
*/
private static long budget;
/**
* Whether we should be executing any work this tick.
*
* This is true iff {@code MAX_TICK_TIME - currentTime} was true <em>at the beginning of the tick</em>.
*/
private static boolean canExecute = true;
public static long getUniqueTaskID()
{
synchronized( m_nextUnusedTaskIDLock )
return lastTaskId.incrementAndGet();
}
static void queue( @Nonnull MainThreadExecutor executor )
{
synchronized( executors )
{
return ++m_nextUnusedTaskID;
if( executor.onQueue ) throw new IllegalStateException( "Cannot queue already queued executor" );
executor.onQueue = true;
executors.add( executor );
}
}
public static boolean queueTask( ITask task )
static void cooling( @Nonnull MainThreadExecutor executor )
{
synchronized( m_outstandingTasks )
{
if( m_outstandingTasks.size() < MAX_TASKS_TOTAL )
{
m_outstandingTasks.offer( task );
return true;
}
}
return false;
cooling.add( executor );
}
static void consumeTime( long time )
{
budget -= time;
}
static boolean canExecute()
{
return canExecute;
}
static int currentTick()
{
return currentTick;
}
public static void executePendingTasks()
{
int tasksThisTick = 0;
while( tasksThisTick < MAX_TASKS_PER_TICK )
// Move onto the next tick and cool down the global executor. We're allowed to execute if we have _any_ time
// allocated for this tick. This means we'll stick much closer to doing MAX_TICK_TIME work every tick.
//
// Of course, we'll go over the MAX_TICK_TIME most of the time, but eventually that overrun will accumulate
// and we'll skip a whole tick - bringing the average back down again.
currentTick++;
budget += Math.min( budget + MAX_TICK_TIME, MAX_TICK_TIME );
canExecute = budget > 0;
// Cool down any warm computers.
cooling.removeIf( MainThreadExecutor::tickCooling );
if( !canExecute ) return;
// Run until we meet the deadline.
long start = System.nanoTime();
long deadline = start + budget;
while( true )
{
ITask task = null;
synchronized( m_outstandingTasks )
MainThreadExecutor executor;
synchronized( executors )
{
task = m_outstandingTasks.poll();
executor = executors.poll();
}
if( task != null )
{
long start = System.nanoTime();
task.execute();
if( executor == null ) break;
long stop = System.nanoTime();
Computer computer = task.getOwner();
if( computer != null ) Tracking.addServerTiming( computer, stop - start );
long taskStart = System.nanoTime();
executor.execute();
++tasksThisTick;
}
else
long taskStop = System.nanoTime();
if( executor.afterExecute( taskStop - taskStart ) )
{
break;
synchronized( executors )
{
executors.add( executor );
}
}
if( taskStop >= deadline ) break;
}
consumeTime( System.nanoTime() - start );
}
public static void reset()
{
currentTick = 0;
budget = 0;
canExecute = true;
lastTaskId.set( 0 );
cooling.clear();
synchronized( executors )
{
executors.clear();
}
}
}

View File

@ -0,0 +1,228 @@
/*
* This file is part of ComputerCraft - http://www.computercraft.info
* Copyright Daniel Ratcliffe, 2011-2019. Do not distribute without permission.
* Send enquiries to dratcliffe@gmail.com
*/
package dan200.computercraft.core.computer;
import dan200.computercraft.core.tracking.Tracking;
import dan200.computercraft.shared.turtle.core.TurtleBrain;
import net.minecraft.tileentity.TileEntity;
import java.util.ArrayDeque;
import java.util.Queue;
import static dan200.computercraft.core.computer.MainThread.MAX_COMPUTER_TIME;
/**
* Keeps track of tasks that a {@link Computer} should run on the main thread and how long that has been spent executing
* them.
*
* This provides rate-limiting mechanism for tasks enqueued with {@link Computer#queueMainThread(Runnable)}, but also
* those run elsewhere (such as during the turtle's tick - see {@link TurtleBrain#update()}). In order to handle this,
* the executor goes through three stages:
*
* When {@link State#COOL}, the computer is allocated {@link MainThread#MAX_COMPUTER_TIME}ns to execute any work this
* tick. At the beginning of the tick, we execute as many {@link MainThread} tasks as possible, until our timeframe or
* the global time frame has expired.
*
* Then, when other objects (such as {@link TileEntity}) are ticked, we update how much time we've used using
* {@link Computer#afterExecuteMainThread(long)}.
*
* Now, if anywhere during this period, we use more than our allocated time slice, the executor is marked as
* {@link State#HOT}. This means it will no longer be able to execute {@link MainThread} tasks (though will still
* execute tile entity tasks, in order to prevent the main thread from exhausting work every tick).
*
* At the beginning of the next tick, we increment the budget e by {@link MainThread#MAX_COMPUTER_TIME} and any
* {@link State#HOT} executors are marked as {@link State#COOLING}. They will remain cooling until their budget is
* fully replenished (is equal to {@link MainThread#MAX_COMPUTER_TIME}). Note, this is different to {@link MainThread},
* which allows running when it has any budget left. When cooling, <em>no</em> tasks are executed - be they on the tile
* entity or main thread.
*
* This mechanism means that, on average, computers will use at most {@link MainThread#MAX_COMPUTER_TIME}ns per second,
* but one task source will not prevent others from executing.
*
* @see MainThread
* @see Computer#canExecuteMainThread()
* @see Computer#queueMainThread(Runnable)
* @see Computer#afterExecuteMainThread(long)
*/
final class MainThreadExecutor
{
/**
* The maximum number of {@link MainThread} tasks allowed on the queue.
*/
private static final int MAX_TASKS = 5000;
private final Computer computer;
/**
* A lock used for any changes to {@link #tasks}, or {@link #onQueue}. This will be
* used on the main thread, so locks should be kept as brief as possible.
*/
private final Object queueLock = new Object();
/**
* The queue of tasks which should be executed.
*
* @see #queueLock
*/
private final Queue<Runnable> tasks = new ArrayDeque<>( 4 );
/**
* Determines if this executor is currently present on the queue.
*
* This should be true iff {@link #tasks} is non-empty.
*
* @see #queueLock
* @see #enqueue(Runnable)
* @see #afterExecute(long)
*/
volatile boolean onQueue;
/**
* The remaining budgeted time for this tick. This may be negative, in the case that we've gone over budget.
*
* @see #tickCooling()
* @see #consumeTime(long)
*/
private long budget = 0;
/**
* The last tick that {@link #budget} was updated.
*
* @see #tickCooling()
* @see #consumeTime(long)
*/
private int currentTick = -1;
/**
* The current state of this executor.
*
* @see #canExecuteExternal()
*/
private State state = State.COOL;
MainThreadExecutor( Computer computer )
{
this.computer = computer;
}
/**
* Push a task onto this executor's queue, pushing it onto the {@link MainThread} if needed.
*
* @param runnable The task to run on the main thread.
* @return Whether this task was enqueued (namely, was there space).
*/
boolean enqueue( Runnable runnable )
{
synchronized( queueLock )
{
if( tasks.size() >= MAX_TASKS || !tasks.offer( runnable ) ) return false;
if( !onQueue && state == State.COOL ) MainThread.queue( this );
return true;
}
}
void execute()
{
if( state != State.COOL ) return;
Runnable task;
synchronized( queueLock )
{
task = tasks.poll();
}
if( task != null ) task.run();
}
/**
* Update the time taken to run an {@link #enqueue(Runnable)} task.
*
* @param time The time some task took to run.
* @return Whether this should be added back to the queue.
*/
boolean afterExecute( long time )
{
consumeTime( time );
synchronized( queueLock )
{
if( state != State.COOL || tasks.isEmpty() ) return onQueue = false;
return true;
}
}
/**
* Update the time taken to run an external task (one not part of {@link #tasks}), incrementing the appropriate
* statistics.
*
* @param time The time some task took to run
*/
void afterExecuteExternal( long time )
{
consumeTime( time );
MainThread.consumeTime( time );
}
/**
* Whether we should execute "external" tasks (ones not part of {@link #tasks}).
*
* @return Whether we can execute external tasks.
*/
boolean canExecuteExternal()
{
return state != State.COOLING;
}
private void consumeTime( long time )
{
Tracking.addServerTiming( computer, time );
// Reset the budget if moving onto a new tick. We know this is safe, as this will only have happened if
// #tickCooling() isn't called, and so we didn't overrun the previous tick.
if( currentTick != MainThread.currentTick() )
{
currentTick = MainThread.currentTick();
budget = MAX_COMPUTER_TIME;
}
budget -= time;
// If we've gone over our limit, mark us as having to cool down.
if( budget < 0 && state == State.COOL )
{
state = State.HOT;
MainThread.cooling( this );
}
}
/**
* Move this executor forward one tick, replenishing the budget by {@link MainThread#MAX_COMPUTER_TIME}.
*
* @return Whether this executor has cooled down, and so is safe to run again.
*/
boolean tickCooling()
{
state = State.COOLING;
currentTick = MainThread.currentTick();
budget += Math.min( budget + MAX_COMPUTER_TIME, MAX_COMPUTER_TIME );
if( budget < MAX_COMPUTER_TIME ) return false;
state = State.COOL;
synchronized( queueLock )
{
if( !tasks.isEmpty() && !onQueue ) MainThread.queue( this );
}
return true;
}
private enum State
{
COOL,
HOT,
COOLING,
}
}

View File

@ -9,7 +9,6 @@ package dan200.computercraft.core.lua;
import dan200.computercraft.ComputerCraft;
import dan200.computercraft.api.lua.*;
import dan200.computercraft.core.computer.Computer;
import dan200.computercraft.core.computer.ITask;
import dan200.computercraft.core.computer.MainThread;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.tracking.Tracking;
@ -536,53 +535,36 @@ public class CobaltLuaMachine implements ILuaMachine
{
// Issue command
final long taskID = MainThread.getUniqueTaskID();
final ITask iTask = new ITask()
{
@Nonnull
@Override
public Computer getOwner()
final Runnable iTask = () -> {
try
{
return m_computer;
Object[] results = task.execute();
if( results != null )
{
Object[] eventArguments = new Object[results.length + 2];
eventArguments[0] = taskID;
eventArguments[1] = true;
System.arraycopy( results, 0, eventArguments, 2, results.length );
m_computer.queueEvent( "task_complete", eventArguments );
}
else
{
m_computer.queueEvent( "task_complete", new Object[] { taskID, true } );
}
}
@Override
public void execute()
catch( LuaException e )
{
try
{
Object[] results = task.execute();
if( results != null )
{
Object[] eventArguments = new Object[results.length + 2];
eventArguments[0] = taskID;
eventArguments[1] = true;
System.arraycopy( results, 0, eventArguments, 2, results.length );
m_computer.queueEvent( "task_complete", eventArguments );
}
else
{
m_computer.queueEvent( "task_complete", new Object[] { taskID, true } );
}
}
catch( LuaException e )
{
m_computer.queueEvent( "task_complete", new Object[] {
taskID, false, e.getMessage()
} );
}
catch( Throwable t )
{
if( ComputerCraft.logPeripheralErrors )
{
ComputerCraft.log.error( "Error running task", t );
}
m_computer.queueEvent( "task_complete", new Object[] {
taskID, false, "Java Exception Thrown: " + t.toString()
} );
}
m_computer.queueEvent( "task_complete", new Object[] { taskID, false, e.getMessage() } );
}
catch( Throwable t )
{
if( ComputerCraft.logPeripheralErrors ) ComputerCraft.log.error( "Error running task", t );
m_computer.queueEvent( "task_complete", new Object[] {
taskID, false, "Java Exception Thrown: " + t.toString()
} );
}
};
if( MainThread.queueTask( iTask ) )
if( m_computer.queueMainThread( iTask ) )
{
return taskID;
}

View File

@ -13,7 +13,6 @@ import dan200.computercraft.api.lua.ILuaContext;
import dan200.computercraft.api.lua.LuaException;
import dan200.computercraft.api.peripheral.IPeripheral;
import dan200.computercraft.api.turtle.*;
import dan200.computercraft.core.tracking.Tracking;
import dan200.computercraft.shared.computer.blocks.ComputerProxy;
import dan200.computercraft.shared.computer.blocks.TileComputerBase;
import dan200.computercraft.shared.computer.core.ComputerFamily;
@ -957,53 +956,50 @@ public class TurtleBrain implements ITurtleAccess
private void updateCommands()
{
if( m_animation == TurtleAnimation.None )
if( m_animation != TurtleAnimation.None || m_commandQueue.isEmpty() ) return;
// If we've got a computer, ensure that we're allowed to perform work.
ServerComputer computer = m_owner.getServerComputer();
if( computer != null && !computer.getComputer().canExecuteMainThread() ) return;
// Pull a new command
TurtleCommandQueueEntry nextCommand = m_commandQueue.poll();
if( nextCommand == null ) return;
// Execute the command
long start = System.nanoTime();
TurtleCommandResult result = nextCommand.command.execute( this );
long end = System.nanoTime();
// Dispatch the callback
if( computer == null ) return;
computer.getComputer().afterExecuteMainThread( end - start );
int callbackID = nextCommand.callbackID;
if( callbackID < 0 ) return;
if( result != null && result.isSuccess() )
{
// Pull a new command
TurtleCommandQueueEntry nextCommand = m_commandQueue.poll();
if( nextCommand != null )
Object[] results = result.getResults();
if( results != null )
{
ServerComputer computer = m_owner.getServerComputer();
// Execute the command
long start = System.nanoTime();
TurtleCommandResult result = nextCommand.command.execute( this );
long end = System.nanoTime();
// Dispatch the callback
if( computer != null )
{
Tracking.addServerTiming( computer.getComputer(), end - start );
int callbackID = nextCommand.callbackID;
if( callbackID >= 0 )
{
if( result != null && result.isSuccess() )
{
Object[] results = result.getResults();
if( results != null )
{
Object[] arguments = new Object[results.length + 2];
arguments[0] = callbackID;
arguments[1] = true;
System.arraycopy( results, 0, arguments, 2, results.length );
computer.queueEvent( "turtle_response", arguments );
}
else
{
computer.queueEvent( "turtle_response", new Object[] {
callbackID, true
} );
}
}
else
{
computer.queueEvent( "turtle_response", new Object[] {
callbackID, false, result != null ? result.getErrorMessage() : null
} );
}
}
}
Object[] arguments = new Object[results.length + 2];
arguments[0] = callbackID;
arguments[1] = true;
System.arraycopy( results, 0, arguments, 2, results.length );
computer.queueEvent( "turtle_response", arguments );
}
else
{
computer.queueEvent( "turtle_response", new Object[] {
callbackID, true
} );
}
}
else
{
computer.queueEvent( "turtle_response", new Object[] {
callbackID, false, result != null ? result.getErrorMessage() : null
} );
}
}