1
0
mirror of https://github.com/SquidDev-CC/CC-Tweaked synced 2025-01-28 18:04:47 +00:00

Rewrite the computer thread system

This makes a couple of significant changes to the original system, to
reduce the number of threads created and allow for multiple threads in
the future. There are several notable changes from the original
implementation:

 - A blocking queue is used for the main task queue queue. This removes
   the need for the "monitor" variable and allows for multiple threads
   polling this queue in the future.
 - The thread used to execute tasks is "cached" between tasks,
   significantly reducing the number of threads which need to be
   created. If a task needs to be stopped then the thread is then
   terminated and a new one constructed, though this rarely happens.
This commit is contained in:
SquidDev 2017-05-08 18:33:47 +01:00
parent 61ff91f237
commit 85c556d324

View File

@ -8,223 +8,302 @@ package dan200.computercraft.core.computer;
import dan200.computercraft.ComputerCraft; import dan200.computercraft.ComputerCraft;
import java.util.ArrayList; import java.util.HashSet;
import java.util.Iterator; import java.util.Set;
import java.util.WeakHashMap; import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ComputerThread public class ComputerThread
{ {
private static final Object m_lock; private static final int QUEUE_LIMIT = 256;
private static Thread m_thread; /**
private static final WeakHashMap <Object, LinkedBlockingQueue<ITask>> m_computerTasks; * Lock used for modifications to the object
private static final ArrayList <LinkedBlockingQueue<ITask>> m_computerTasksActive; */
private static final ArrayList <LinkedBlockingQueue<ITask>> m_computerTasksPending; private static final Object s_stateLock = new Object();
private static final Object m_defaultQueue;
private static final Object m_monitor;
private static boolean m_running; /**
private static boolean m_stopped; * Lock for various task operations
*/
private static final Object s_taskLock = new Object();
static /**
{ * Map of objects to task list
m_lock = new Object(); */
m_thread = null; private static final WeakHashMap<Object, BlockingQueue<ITask>> s_computerTaskQueues = new WeakHashMap<>();
m_computerTasks = new WeakHashMap<>();
m_computerTasksPending = new ArrayList<>();
m_computerTasksActive = new ArrayList<>();
m_defaultQueue = new Object();
m_monitor = new Object();
m_running = false;
m_stopped = false;
}
/**
* Active queues to execute
*/
private static final BlockingQueue<BlockingQueue<ITask>> s_computerTasksActive = new LinkedBlockingQueue<>();
private static final Set<BlockingQueue<ITask>> s_computerTasksActiveSet = new HashSet<>();
/**
* The default object for items which don't have an owner
*/
private static final Object s_defaultOwner = new Object();
/**
* Whether the thread is stopped or should be stopped
*/
private static boolean s_stopped = false;
/**
* The thread tasks execute on
*/
private static Thread s_thread = null;
private static final AtomicInteger s_counter = new AtomicInteger( 1 );
/**
* Start the computer thread
*/
public static void start() public static void start()
{ {
synchronized( m_lock ) synchronized( s_stateLock )
{ {
if( m_running ) s_stopped = false;
if( s_thread == null || !s_thread.isAlive() )
{ {
m_stopped = false; SecurityManager manager = System.getSecurityManager();
return; final ThreadGroup group = manager == null ? Thread.currentThread().getThreadGroup() : manager.getThreadGroup();
Thread thread = s_thread = new Thread( group, new TaskExecutor(), "ComputerCraft-Computer-Manager" );
thread.setDaemon( true );
thread.start();
}
}
} }
m_thread = new Thread( () -> /**
* Attempt to stop the computer thread
*/
public static void stop()
{
synchronized( s_stateLock )
{
if( s_thread != null )
{
s_stopped = true;
if( s_thread.isAlive() )
{
s_thread.interrupt();
}
}
}
}
/**
* Queue a task to execute on the thread
*
* @param task The task to execute
* @param computer The computer to execute it on, use {@code null} to execute on the default object.
*/
public static void queueTask( ITask task, Computer computer )
{
Object queueObject = computer == null ? s_defaultOwner : computer;
BlockingQueue<ITask> queue;
synchronized( s_computerTaskQueues )
{
queue = s_computerTaskQueues.get( queueObject );
if( queue == null )
{
s_computerTaskQueues.put( queueObject, queue = new LinkedBlockingQueue<>( QUEUE_LIMIT ) );
}
}
synchronized( s_taskLock )
{
if( queue.offer( task ) && !s_computerTasksActiveSet.contains( queue ) )
{
s_computerTasksActive.add( queue );
s_computerTasksActiveSet.add( queue );
}
}
}
/**
* Responsible for pulling and managing computer tasks. This pulls a task from {@link #s_computerTasksActive},
* creates a new thread using {@link TaskRunner} or reuses a previous one and uses that to execute the task.
*
* If the task times out, then it will attempt to interrupt the {@link TaskRunner} instance.
*/
private static final class TaskExecutor implements Runnable
{
private TaskRunner runner;
private Thread thread;
@Override
public void run()
{
try
{ {
while( true ) while( true )
{ {
synchronized( m_computerTasksPending ) // Wait for an active queue to execute
{ BlockingQueue<ITask> queue = s_computerTasksActive.take();
if (!m_computerTasksPending.isEmpty())
{
Iterator<LinkedBlockingQueue<ITask>> it = m_computerTasksPending.iterator();
while(it.hasNext())
{
LinkedBlockingQueue<ITask> queue = it.next();
if (!m_computerTasksActive.contains(queue)) // If threads should be stopped then return
synchronized( s_stateLock )
{ {
m_computerTasksActive.add(queue); if( s_stopped ) return;
} }
it.remove();
execute( queue );
} }
/* }
m_computerTasksActive.addAll(m_computerTasksPending); // put any that have been added since catch( InterruptedException ignored )
m_computerTasksPending.clear(); {
*/
} }
} }
Iterator<LinkedBlockingQueue<ITask>> it = m_computerTasksActive.iterator(); private void execute( BlockingQueue<ITask> queue )
while (it.hasNext())
{ {
LinkedBlockingQueue<ITask> queue = it.next(); ITask task = queue.remove();
if (queue == null || queue.isEmpty()) // we don't need the blocking part of the queue. Null check to ensure it exists due to a weird NPE I got if( thread == null || !thread.isAlive() )
{ {
continue; runner = new TaskRunner();
SecurityManager manager = System.getSecurityManager();
final ThreadGroup group = manager == null ? Thread.currentThread().getThreadGroup() : manager.getThreadGroup();
Thread thread = this.thread = new Thread( group, runner, "ComputerCraft-Computer-Runner" + s_counter.getAndIncrement() );
thread.setDaemon( true );
thread.start();
} }
synchronized( m_lock ) // Execute the task
{ runner.submit( task );
if( m_stopped )
{
m_running = false;
m_thread = null;
return;
}
}
try try
{ {
final ITask task = queue.take(); // If we timed out rather than exiting:
boolean done = runner.await( 7000 );
// Create the task if( !done )
Thread worker = new Thread( () ->
{ {
try { // Attempt to soft then hard abort
task.execute();
} catch( Throwable e ) {
ComputerCraft.log.error( "Error running task", e );
}
} );
// Run the task
worker.setDaemon(true);
worker.start();
worker.join( 7000 );
if( worker.isAlive() )
{
// Task ran for too long
// Initiate escape plan
Computer computer = task.getOwner(); Computer computer = task.getOwner();
if( computer != null ) if( computer != null )
{ {
// Step 1: Soft abort
computer.abort( false ); computer.abort( false );
worker.join( 1500 );
if( worker.isAlive() ) done = runner.await( 1500 );
if( !done )
{ {
// Step 2: Hard abort
computer.abort( true ); computer.abort( true );
worker.join( 1500 ); done = runner.await( 1500 );
} }
} }
// Step 3: abandon // Interrupt the thread
if( worker.isAlive() ) if( !done )
{ {
// ComputerCraft.log.warn( "Failed to abort Computer " + computer.getID() + ". Dangling lua thread could cause errors." ); thread.interrupt();
worker.interrupt(); thread = null;
runner = null;
} }
} }
} }
catch( InterruptedException e ) catch( InterruptedException ignored )
{ {
continue;
} }
synchronized (queue) // Re-add it back onto the queue or remove it
synchronized( s_taskLock )
{ {
if (queue.isEmpty()) if( queue.isEmpty() )
{ {
it.remove(); s_computerTasksActiveSet.remove( queue );
}
}
}
while (m_computerTasksActive.isEmpty() && m_computerTasksPending.isEmpty())
{
synchronized (m_monitor)
{
try
{
m_monitor.wait();
}
catch( InterruptedException e )
{
}
}
}
}
}, "Computer Dispatch Thread" );
m_thread.setDaemon(true);
m_thread.start();
m_running = true;
}
}
public static void stop()
{
synchronized( m_lock )
{
if( m_running )
{
m_stopped = true;
m_thread.interrupt();
}
}
}
public static void queueTask( ITask _task, Computer computer )
{
Object queueObject = computer;
if (queueObject == null)
{
queueObject = m_defaultQueue;
}
LinkedBlockingQueue<ITask> queue = m_computerTasks.get(queueObject);
if (queue == null)
{
m_computerTasks.put(queueObject, queue = new LinkedBlockingQueue<>( 256 ));
}
synchronized ( m_computerTasksPending )
{
if( queue.offer( _task ) )
{
if( !m_computerTasksPending.contains( queue ) )
{
m_computerTasksPending.add( queue );
}
} }
else else
{ {
//System.out.println( "Event queue overflow" ); s_computerTasksActive.add( queue );
}
}
} }
} }
synchronized (m_monitor) /**
* Responsible for the actual running of tasks. It waitin for the {@link TaskRunner#input} semaphore to be
* triggered, consumes a task and then triggers {@link TaskRunner#finished}.
*/
private static final class TaskRunner implements Runnable
{ {
m_monitor.notify(); private final Semaphore input = new Semaphore();
private final Semaphore finished = new Semaphore();
private ITask task;
@Override
public void run()
{
try
{
while( true )
{
input.await();
try
{
task.execute();
}
catch( Throwable e )
{
ComputerCraft.log.error( "Error running task.", e );
}
task = null;
finished.signal();
}
}
catch( InterruptedException e )
{
ComputerCraft.log.error( "Error running task.", e );
}
}
void submit( ITask task )
{
this.task = task;
input.signal();
}
boolean await( long timeout ) throws InterruptedException
{
return finished.await( timeout );
}
}
/**
* A simple method to allow awaiting/providing a signal.
*
* Java does provide similar classes, but I only needed something simple.
*/
private static final class Semaphore
{
private volatile boolean state = false;
synchronized void signal()
{
state = true;
notify();
}
synchronized void await() throws InterruptedException
{
while( !state ) wait();
state = false;
}
synchronized boolean await( long timeout ) throws InterruptedException
{
if( !state )
{
wait( timeout );
if( !state ) return false;
}
state = false;
return true;
} }
} }
} }