diff --git a/src/main/java/dan200/computercraft/core/computer/ComputerThread.java b/src/main/java/dan200/computercraft/core/computer/ComputerThread.java index fc9071bb3..cf174d812 100644 --- a/src/main/java/dan200/computercraft/core/computer/ComputerThread.java +++ b/src/main/java/dan200/computercraft/core/computer/ComputerThread.java @@ -8,223 +8,302 @@ package dan200.computercraft.core.computer; import dan200.computercraft.ComputerCraft; -import java.util.ArrayList; -import java.util.Iterator; +import java.util.HashSet; +import java.util.Set; import java.util.WeakHashMap; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; - +import java.util.concurrent.atomic.AtomicInteger; + public class ComputerThread { - private static final Object m_lock; - - private static Thread m_thread; - private static final WeakHashMap > m_computerTasks; - private static final ArrayList > m_computerTasksActive; - private static final ArrayList > m_computerTasksPending; - private static final Object m_defaultQueue; - private static final Object m_monitor; + private static final int QUEUE_LIMIT = 256; - private static boolean m_running; - private static boolean m_stopped; - - static - { - m_lock = new Object(); - m_thread = null; - m_computerTasks = new WeakHashMap<>(); - m_computerTasksPending = new ArrayList<>(); - m_computerTasksActive = new ArrayList<>(); - m_defaultQueue = new Object(); - m_monitor = new Object(); - m_running = false; - m_stopped = false; - } - + /** + * Lock used for modifications to the object + */ + private static final Object s_stateLock = new Object(); + + /** + * Lock for various task operations + */ + private static final Object s_taskLock = new Object(); + + /** + * Map of objects to task list + */ + private static final WeakHashMap> s_computerTaskQueues = new WeakHashMap<>(); + + /** + * Active queues to execute + */ + private static final BlockingQueue> s_computerTasksActive = new LinkedBlockingQueue<>(); + private static final Set> s_computerTasksActiveSet = new HashSet<>(); + + /** + * The default object for items which don't have an owner + */ + private static final Object s_defaultOwner = new Object(); + + /** + * Whether the thread is stopped or should be stopped + */ + private static boolean s_stopped = false; + + /** + * The thread tasks execute on + */ + private static Thread s_thread = null; + + private static final AtomicInteger s_counter = new AtomicInteger( 1 ); + + /** + * Start the computer thread + */ public static void start() { - synchronized( m_lock ) + synchronized( s_stateLock ) { - if( m_running ) + s_stopped = false; + if( s_thread == null || !s_thread.isAlive() ) { - m_stopped = false; - return; + SecurityManager manager = System.getSecurityManager(); + final ThreadGroup group = manager == null ? Thread.currentThread().getThreadGroup() : manager.getThreadGroup(); + + Thread thread = s_thread = new Thread( group, new TaskExecutor(), "ComputerCraft-Computer-Manager" ); + thread.setDaemon( true ); + thread.start(); } - - m_thread = new Thread( () -> + } + } + + /** + * Attempt to stop the computer thread + */ + public static void stop() + { + synchronized( s_stateLock ) + { + if( s_thread != null ) + { + s_stopped = true; + if( s_thread.isAlive() ) + { + s_thread.interrupt(); + } + } + } + } + + /** + * Queue a task to execute on the thread + * + * @param task The task to execute + * @param computer The computer to execute it on, use {@code null} to execute on the default object. + */ + public static void queueTask( ITask task, Computer computer ) + { + Object queueObject = computer == null ? s_defaultOwner : computer; + + BlockingQueue queue; + synchronized( s_computerTaskQueues ) + { + queue = s_computerTaskQueues.get( queueObject ); + if( queue == null ) + { + s_computerTaskQueues.put( queueObject, queue = new LinkedBlockingQueue<>( QUEUE_LIMIT ) ); + } + } + + synchronized( s_taskLock ) + { + if( queue.offer( task ) && !s_computerTasksActiveSet.contains( queue ) ) + { + s_computerTasksActive.add( queue ); + s_computerTasksActiveSet.add( queue ); + } + } + } + + /** + * Responsible for pulling and managing computer tasks. This pulls a task from {@link #s_computerTasksActive}, + * creates a new thread using {@link TaskRunner} or reuses a previous one and uses that to execute the task. + * + * If the task times out, then it will attempt to interrupt the {@link TaskRunner} instance. + */ + private static final class TaskExecutor implements Runnable + { + private TaskRunner runner; + private Thread thread; + + @Override + public void run() + { + try { while( true ) { - synchronized( m_computerTasksPending ) - { - if (!m_computerTasksPending.isEmpty()) - { - Iterator> it = m_computerTasksPending.iterator(); - while(it.hasNext()) - { - LinkedBlockingQueue queue = it.next(); - - if (!m_computerTasksActive.contains(queue)) - { - m_computerTasksActive.add(queue); - } - it.remove(); - } - /* - m_computerTasksActive.addAll(m_computerTasksPending); // put any that have been added since - m_computerTasksPending.clear(); - */ - } - } - - Iterator> it = m_computerTasksActive.iterator(); - - while (it.hasNext()) - { - LinkedBlockingQueue queue = it.next(); - - if (queue == null || queue.isEmpty()) // we don't need the blocking part of the queue. Null check to ensure it exists due to a weird NPE I got - { - continue; - } - - synchronized( m_lock ) - { - if( m_stopped ) - { - m_running = false; - m_thread = null; - return; - } - } - - try - { - final ITask task = queue.take(); + // Wait for an active queue to execute + BlockingQueue queue = s_computerTasksActive.take(); - // Create the task - Thread worker = new Thread( () -> - { - try { - task.execute(); - } catch( Throwable e ) { - ComputerCraft.log.error( "Error running task", e ); - } - } ); - - // Run the task - worker.setDaemon(true); - worker.start(); - worker.join( 7000 ); - - if( worker.isAlive() ) - { - // Task ran for too long - // Initiate escape plan - Computer computer = task.getOwner(); - if( computer != null ) - { - // Step 1: Soft abort - computer.abort( false ); - worker.join( 1500 ); - - if( worker.isAlive() ) - { - // Step 2: Hard abort - computer.abort( true ); - worker.join( 1500 ); - } - } - - // Step 3: abandon - if( worker.isAlive() ) - { - // ComputerCraft.log.warn( "Failed to abort Computer " + computer.getID() + ". Dangling lua thread could cause errors." ); - worker.interrupt(); - } - } - } - catch( InterruptedException e ) - { - continue; - } - - synchronized (queue) - { - if (queue.isEmpty()) - { - it.remove(); - } - } - } - - while (m_computerTasksActive.isEmpty() && m_computerTasksPending.isEmpty()) + // If threads should be stopped then return + synchronized( s_stateLock ) { - synchronized (m_monitor) - { - try - { - m_monitor.wait(); - } - catch( InterruptedException e ) - { - } - } + if( s_stopped ) return; } + + execute( queue ); } - }, "Computer Dispatch Thread" ); - - m_thread.setDaemon(true); - m_thread.start(); - m_running = true; - } - } - - public static void stop() - { - synchronized( m_lock ) - { - if( m_running ) + } + catch( InterruptedException ignored ) { - m_stopped = true; - m_thread.interrupt(); } } - } - - public static void queueTask( ITask _task, Computer computer ) - { - Object queueObject = computer; - - if (queueObject == null) - { - queueObject = m_defaultQueue; - } - - LinkedBlockingQueue queue = m_computerTasks.get(queueObject); - if (queue == null) + private void execute( BlockingQueue queue ) { - m_computerTasks.put(queueObject, queue = new LinkedBlockingQueue<>( 256 )); - } - - synchronized ( m_computerTasksPending ) - { - if( queue.offer( _task ) ) + ITask task = queue.remove(); + + if( thread == null || !thread.isAlive() ) { - if( !m_computerTasksPending.contains( queue ) ) + runner = new TaskRunner(); + + SecurityManager manager = System.getSecurityManager(); + final ThreadGroup group = manager == null ? Thread.currentThread().getThreadGroup() : manager.getThreadGroup(); + Thread thread = this.thread = new Thread( group, runner, "ComputerCraft-Computer-Runner" + s_counter.getAndIncrement() ); + thread.setDaemon( true ); + thread.start(); + } + + // Execute the task + runner.submit( task ); + + try + { + // If we timed out rather than exiting: + boolean done = runner.await( 7000 ); + if( !done ) { - m_computerTasksPending.add( queue ); + // Attempt to soft then hard abort + Computer computer = task.getOwner(); + if( computer != null ) + { + computer.abort( false ); + + done = runner.await( 1500 ); + if( !done ) + { + computer.abort( true ); + done = runner.await( 1500 ); + } + } + + // Interrupt the thread + if( !done ) + { + thread.interrupt(); + thread = null; + runner = null; + } } } - else + catch( InterruptedException ignored ) { - //System.out.println( "Event queue overflow" ); + } + + // Re-add it back onto the queue or remove it + synchronized( s_taskLock ) + { + if( queue.isEmpty() ) + { + s_computerTasksActiveSet.remove( queue ); + } + else + { + s_computerTasksActive.add( queue ); + } } } - - synchronized (m_monitor) + } + + /** + * Responsible for the actual running of tasks. It waitin for the {@link TaskRunner#input} semaphore to be + * triggered, consumes a task and then triggers {@link TaskRunner#finished}. + */ + private static final class TaskRunner implements Runnable + { + private final Semaphore input = new Semaphore(); + private final Semaphore finished = new Semaphore(); + private ITask task; + + @Override + public void run() { - m_monitor.notify(); + try + { + while( true ) + { + input.await(); + try + { + task.execute(); + } + catch( Throwable e ) + { + ComputerCraft.log.error( "Error running task.", e ); + } + task = null; + finished.signal(); + } + } + catch( InterruptedException e ) + { + ComputerCraft.log.error( "Error running task.", e ); + } + } + + void submit( ITask task ) + { + this.task = task; + input.signal(); + } + + boolean await( long timeout ) throws InterruptedException + { + return finished.await( timeout ); + } + } + + /** + * A simple method to allow awaiting/providing a signal. + * + * Java does provide similar classes, but I only needed something simple. + */ + private static final class Semaphore + { + private volatile boolean state = false; + + synchronized void signal() + { + state = true; + notify(); + } + + synchronized void await() throws InterruptedException + { + while( !state ) wait(); + state = false; + } + + synchronized boolean await( long timeout ) throws InterruptedException + { + if( !state ) + { + wait( timeout ); + if( !state ) return false; + } + state = false; + return true; } } }