Merge pull request #218 from SquidDev-CC/ComputerCraft/feature/new-computer-thread

Rewrite the computer thread system
This commit is contained in:
SquidDev 2017-11-14 21:33:20 +00:00
commit 845118e9e2
1 changed files with 274 additions and 187 deletions

View File

@ -8,223 +8,310 @@
import dan200.computercraft.ComputerCraft;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashSet;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class ComputerThread
{
private static final Object m_lock;
private static Thread m_thread;
private static final WeakHashMap <Object, LinkedBlockingQueue<ITask>> m_computerTasks;
private static final ArrayList <LinkedBlockingQueue<ITask>> m_computerTasksActive;
private static final ArrayList <LinkedBlockingQueue<ITask>> m_computerTasksPending;
private static final Object m_defaultQueue;
private static final Object m_monitor;
private static final int QUEUE_LIMIT = 256;
private static boolean m_running;
private static boolean m_stopped;
static
{
m_lock = new Object();
m_thread = null;
m_computerTasks = new WeakHashMap<>();
m_computerTasksPending = new ArrayList<>();
m_computerTasksActive = new ArrayList<>();
m_defaultQueue = new Object();
m_monitor = new Object();
m_running = false;
m_stopped = false;
}
/**
* Lock used for modifications to the object
*/
private static final Object s_stateLock = new Object();
/**
* Lock for various task operations
*/
private static final Object s_taskLock = new Object();
/**
* Map of objects to task list
*/
private static final WeakHashMap<Object, BlockingQueue<ITask>> s_computerTaskQueues = new WeakHashMap<>();
/**
* Active queues to execute
*/
private static final BlockingQueue<BlockingQueue<ITask>> s_computerTasksActive = new LinkedBlockingQueue<>();
private static final Set<BlockingQueue<ITask>> s_computerTasksActiveSet = new HashSet<>();
/**
* The default object for items which don't have an owner
*/
private static final Object s_defaultOwner = new Object();
/**
* Whether the thread is stopped or should be stopped
*/
private static boolean s_stopped = false;
/**
* The thread tasks execute on
*/
private static Thread s_thread = null;
private static final AtomicInteger s_counter = new AtomicInteger( 1 );
/**
* Start the computer thread
*/
public static void start()
{
synchronized( m_lock )
synchronized( s_stateLock )
{
if( m_running )
s_stopped = false;
if( s_thread == null || !s_thread.isAlive() )
{
m_stopped = false;
return;
SecurityManager manager = System.getSecurityManager();
final ThreadGroup group = manager == null ? Thread.currentThread().getThreadGroup() : manager.getThreadGroup();
Thread thread = s_thread = new Thread( group, new TaskExecutor(), "ComputerCraft-Computer-Manager" );
thread.setDaemon( true );
thread.start();
}
m_thread = new Thread( () ->
}
}
/**
* Attempt to stop the computer thread
*/
public static void stop()
{
synchronized( s_stateLock )
{
if( s_thread != null )
{
s_stopped = true;
if( s_thread.isAlive() )
{
s_thread.interrupt();
}
}
}
synchronized( s_taskLock )
{
s_computerTaskQueues.clear();
s_computerTasksActive.clear();
s_computerTasksActiveSet.clear();
}
}
/**
* Queue a task to execute on the thread
*
* @param task The task to execute
* @param computer The computer to execute it on, use {@code null} to execute on the default object.
*/
public static void queueTask( ITask task, Computer computer )
{
Object queueObject = computer == null ? s_defaultOwner : computer;
BlockingQueue<ITask> queue;
synchronized( s_computerTaskQueues )
{
queue = s_computerTaskQueues.get( queueObject );
if( queue == null )
{
s_computerTaskQueues.put( queueObject, queue = new LinkedBlockingQueue<>( QUEUE_LIMIT ) );
}
}
synchronized( s_taskLock )
{
if( queue.offer( task ) && !s_computerTasksActiveSet.contains( queue ) )
{
s_computerTasksActive.add( queue );
s_computerTasksActiveSet.add( queue );
}
}
}
/**
* Responsible for pulling and managing computer tasks. This pulls a task from {@link #s_computerTasksActive},
* creates a new thread using {@link TaskRunner} or reuses a previous one and uses that to execute the task.
*
* If the task times out, then it will attempt to interrupt the {@link TaskRunner} instance.
*/
private static final class TaskExecutor implements Runnable
{
private TaskRunner runner;
private Thread thread;
@Override
public void run()
{
try
{
while( true )
{
synchronized( m_computerTasksPending )
{
if (!m_computerTasksPending.isEmpty())
{
Iterator<LinkedBlockingQueue<ITask>> it = m_computerTasksPending.iterator();
while(it.hasNext())
{
LinkedBlockingQueue<ITask> queue = it.next();
if (!m_computerTasksActive.contains(queue))
{
m_computerTasksActive.add(queue);
}
it.remove();
}
/*
m_computerTasksActive.addAll(m_computerTasksPending); // put any that have been added since
m_computerTasksPending.clear();
*/
}
}
Iterator<LinkedBlockingQueue<ITask>> it = m_computerTasksActive.iterator();
while (it.hasNext())
{
LinkedBlockingQueue<ITask> queue = it.next();
if (queue == null || queue.isEmpty()) // we don't need the blocking part of the queue. Null check to ensure it exists due to a weird NPE I got
{
continue;
}
synchronized( m_lock )
{
if( m_stopped )
{
m_running = false;
m_thread = null;
return;
}
}
try
{
final ITask task = queue.take();
// Wait for an active queue to execute
BlockingQueue<ITask> queue = s_computerTasksActive.take();
// Create the task
Thread worker = new Thread( () ->
{
try {
task.execute();
} catch( Throwable e ) {
ComputerCraft.log.error( "Error running task", e );
}
} );
// Run the task
worker.setDaemon(true);
worker.start();
worker.join( 7000 );
if( worker.isAlive() )
{
// Task ran for too long
// Initiate escape plan
Computer computer = task.getOwner();
if( computer != null )
{
// Step 1: Soft abort
computer.abort( false );
worker.join( 1500 );
if( worker.isAlive() )
{
// Step 2: Hard abort
computer.abort( true );
worker.join( 1500 );
}
}
// Step 3: abandon
if( worker.isAlive() )
{
// ComputerCraft.log.warn( "Failed to abort Computer " + computer.getID() + ". Dangling lua thread could cause errors." );
worker.interrupt();
}
}
}
catch( InterruptedException e )
{
continue;
}
synchronized (queue)
{
if (queue.isEmpty())
{
it.remove();
}
}
}
while (m_computerTasksActive.isEmpty() && m_computerTasksPending.isEmpty())
// If threads should be stopped then return
synchronized( s_stateLock )
{
synchronized (m_monitor)
{
try
{
m_monitor.wait();
}
catch( InterruptedException e )
{
}
}
if( s_stopped ) return;
}
execute( queue );
}
}, "Computer Dispatch Thread" );
m_thread.setDaemon(true);
m_thread.start();
m_running = true;
}
}
public static void stop()
{
synchronized( m_lock )
{
if( m_running )
}
catch( InterruptedException ignored )
{
m_stopped = true;
m_thread.interrupt();
Thread.currentThread().interrupt();
}
}
}
public static void queueTask( ITask _task, Computer computer )
{
Object queueObject = computer;
if (queueObject == null)
{
queueObject = m_defaultQueue;
}
LinkedBlockingQueue<ITask> queue = m_computerTasks.get(queueObject);
if (queue == null)
private void execute( BlockingQueue<ITask> queue ) throws InterruptedException
{
m_computerTasks.put(queueObject, queue = new LinkedBlockingQueue<>( 256 ));
}
synchronized ( m_computerTasksPending )
{
if( queue.offer( _task ) )
ITask task = queue.remove();
if( thread == null || !thread.isAlive() )
{
if( !m_computerTasksPending.contains( queue ) )
runner = new TaskRunner();
SecurityManager manager = System.getSecurityManager();
final ThreadGroup group = manager == null ? Thread.currentThread().getThreadGroup() : manager.getThreadGroup();
Thread thread = this.thread = new Thread( group, runner, "ComputerCraft-Computer-Runner" + s_counter.getAndIncrement() );
thread.setDaemon( true );
thread.start();
}
// Execute the task
runner.submit( task );
try
{
// If we timed out rather than exiting:
boolean done = runner.await( 7000 );
if( !done )
{
m_computerTasksPending.add( queue );
// Attempt to soft then hard abort
Computer computer = task.getOwner();
if( computer != null )
{
computer.abort( false );
done = runner.await( 1500 );
if( !done )
{
computer.abort( true );
done = runner.await( 1500 );
}
}
// Interrupt the thread
if( !done )
{
thread.interrupt();
thread = null;
runner = null;
}
}
}
else
finally
{
//System.out.println( "Event queue overflow" );
// Re-add it back onto the queue or remove it
synchronized( s_taskLock )
{
if( queue.isEmpty() )
{
s_computerTasksActiveSet.remove( queue );
}
else
{
s_computerTasksActive.add( queue );
}
}
}
}
synchronized (m_monitor)
}
/**
* Responsible for the actual running of tasks. It waitin for the {@link TaskRunner#input} semaphore to be
* triggered, consumes a task and then triggers {@link TaskRunner#finished}.
*/
private static final class TaskRunner implements Runnable
{
private final Semaphore input = new Semaphore();
private final Semaphore finished = new Semaphore();
private ITask task;
@Override
public void run()
{
m_monitor.notify();
try
{
while( true )
{
input.await();
try
{
task.execute();
}
catch( RuntimeException e )
{
ComputerCraft.log.error( "Error running task.", e );
}
task = null;
finished.signal();
}
}
catch( InterruptedException e )
{
ComputerCraft.log.error( "Error running task.", e );
Thread.currentThread().interrupt();
}
}
void submit( ITask task )
{
this.task = task;
input.signal();
}
boolean await( long timeout ) throws InterruptedException
{
return finished.await( timeout );
}
}
/**
* A simple method to allow awaiting/providing a signal.
*
* Java does provide similar classes, but I only needed something simple.
*/
private static final class Semaphore
{
private volatile boolean state = false;
synchronized void signal()
{
state = true;
notify();
}
synchronized void await() throws InterruptedException
{
while( !state ) wait();
state = false;
}
synchronized boolean await( long timeout ) throws InterruptedException
{
if( !state )
{
wait( timeout );
if( !state ) return false;
}
state = false;
return true;
}
}
}