Track allocations while executing computers

This adds a new "java_allocation" metric, which tracks the number of
bytes allocated while executing the computer (as measured by Java). This
is not an 100% reliable number, but hopefully gives some insight into
what computers are doing.
This commit is contained in:
Jonathan Coates 2023-11-09 18:36:35 +00:00
parent 1d365f5a0b
commit 76968f2f28
No known key found for this signature in database
GPG Key ID: B9E431FF07C98D06
8 changed files with 262 additions and 11 deletions

View File

@ -174,6 +174,7 @@ private void addTranslations() {
// Metrics
add(Metrics.COMPUTER_TASKS, "Tasks");
add(Metrics.SERVER_TASKS, "Server tasks");
add(Metrics.JAVA_ALLOCATION, "Java Allocations");
add(Metrics.PERIPHERAL_OPS, "Peripheral calls");
add(Metrics.FS_OPS, "Filesystem operations");
add(Metrics.HTTP_REQUESTS, "HTTP requests");

View File

@ -10,12 +10,14 @@
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.Metrics;
import dan200.computercraft.core.metrics.MetricsObserver;
import dan200.computercraft.core.metrics.ThreadAllocations;
import dan200.computercraft.core.util.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.util.Arrays;
import java.util.Objects;
import java.util.TreeSet;
import java.util.concurrent.ThreadFactory;
@ -476,6 +478,9 @@ public void run() {
}
private void runImpl() {
var workerThreadIds = new long[workersReadOnly().length];
Arrays.fill(workerThreadIds, Thread.currentThread().getId());
while (state.get() < CLOSED) {
computerLock.lock();
try {
@ -490,12 +495,32 @@ private void runImpl() {
computerLock.unlock();
}
checkRunners();
checkRunners(workerThreadIds);
}
}
private void checkRunners() {
for (@Nullable var runner : workersReadOnly()) {
private void checkRunners(long[] workerThreadIds) {
var workers = workersReadOnly();
long[] allocations;
if (ThreadAllocations.isSupported()) {
// If allocation tracking is supported, update the current thread IDs and then fetch the total allocated
// memory. When dealing with multiple workers, it's more efficient to getAllocatedBytes in bulk rather
// than, hence doing it within the worker loop.
// However, this does mean we need to maintain an array of worker thread IDs. We could have a shared
// array and update it within .addWorker(_), but that's got all sorts of thread-safety issues. It ends
// up being easier (and not too inefficient) to just recompute the array each time.
for (var i = 0; i < workers.length; i++) {
var runner = workers[i];
if (runner != null) workerThreadIds[i] = runner.owner.getId();
}
allocations = ThreadAllocations.getAllocatedBytes(workerThreadIds);
} else {
allocations = null;
}
for (var i = 0; i < workers.length; i++) {
var runner = workers[i];
if (runner == null) continue;
// If the worker has no work, skip
@ -505,6 +530,11 @@ private void checkRunners() {
// Refresh the timeout state. Will set the pause/soft timeout flags as appropriate.
executor.timeout.refresh();
// And track the allocated memory.
if (allocations != null) {
executor.updateAllocations(new ThreadAllocation(workerThreadIds[i], allocations[i]));
}
// If we're still within normal execution times (TIMEOUT) or soft abort (ABORT_TIMEOUT),
// then we can let the Lua machine do its work.
var remainingTime = executor.timeout.getRemainingTime();
@ -732,6 +762,9 @@ private final class ExecutorImpl implements Executor {
public static final AtomicReferenceFieldUpdater<ExecutorImpl, ExecutorState> STATE = AtomicReferenceFieldUpdater.newUpdater(
ExecutorImpl.class, ExecutorState.class, "$state"
);
public static final AtomicReferenceFieldUpdater<ExecutorImpl, ThreadAllocation> THREAD_ALLOCATION = AtomicReferenceFieldUpdater.newUpdater(
ExecutorImpl.class, ThreadAllocation.class, "$threadAllocation"
);
final Worker worker;
private final MetricsObserver metrics;
@ -742,6 +775,16 @@ private final class ExecutorImpl implements Executor {
*/
private volatile ExecutorState $state = ExecutorState.IDLE;
/**
* Information about allocations on the currently executing thread.
* <p>
* {@linkplain #beforeWork() Before starting any work}, we set this to the current thread and the current
* {@linkplain ThreadAllocations#getAllocatedBytes(long) amount of allocated memory}. When the computer
* {@linkplain #afterWork()} finishes executing, we set this back to null and compute the difference between the
* two, updating the {@link Metrics#JAVA_ALLOCATION} metric.
*/
private volatile @Nullable ThreadAllocation $threadAllocation = null;
/**
* The amount of time this computer has used on a theoretical machine which shares work evenly amongst computers.
*
@ -768,6 +811,11 @@ private final class ExecutorImpl implements Executor {
void beforeWork() {
vRuntimeStart = System.nanoTime();
timeout.startTimer(scaledPeriod());
if (ThreadAllocations.isSupported()) {
var current = Thread.currentThread().getId();
THREAD_ALLOCATION.set(this, new ThreadAllocation(current, ThreadAllocations.getAllocatedBytes(current)));
}
}
/**
@ -779,10 +827,46 @@ boolean afterWork() {
timeout.reset();
metrics.observe(Metrics.COMPUTER_TASKS, timeout.getExecutionTime());
if (ThreadAllocations.isSupported()) {
var current = Thread.currentThread().getId();
var info = THREAD_ALLOCATION.getAndSet(this, null);
assert info.threadId() == current;
var allocated = ThreadAllocations.getAllocatedBytes(current) - info.allocatedBytes();
if (allocated > 0) {
metrics.observe(Metrics.JAVA_ALLOCATION, allocated);
} else {
LOG.warn("Allocated a negative number of bytes!");
}
}
var state = STATE.getAndUpdate(this, ExecutorState::requeue);
return state == ExecutorState.REPEAT;
}
/**
* Update the per-thread allocation information.
*
* @param allocation The latest allocation information.
*/
void updateAllocations(ThreadAllocation allocation) {
ThreadAllocation current;
long allocated;
do {
// Probe the current information - if it's null or the thread has changed, then the worker has already
// finished and this information is out-of-date, so just abort.
current = THREAD_ALLOCATION.get(this);
if (current == null || current.threadId() != allocation.threadId()) return;
// Then compute the difference since the previous measurement. If the new value is less than the current
// one, then it must be out-of-date. Again, just abort.
allocated = allocation.allocatedBytes() - current.allocatedBytes();
if (allocated <= 0) return;
} while (!THREAD_ALLOCATION.compareAndSet(this, current, allocation));
metrics.observe(Metrics.JAVA_ALLOCATION, allocated);
}
@Override
public void submit() {
var state = STATE.getAndUpdate(this, ExecutorState::enqueue);
@ -811,4 +895,13 @@ protected boolean shouldPause() {
return hasPendingWork();
}
}
/**
* Allocation information about a specific thread.
*
* @param threadId The ID of this thread.
* @param allocatedBytes The amount of memory this thread has allocated.
*/
private record ThreadAllocation(long threadId, long allocatedBytes) {
}
}

View File

@ -14,6 +14,8 @@ private Metrics() {
public static final Metric.Event COMPUTER_TASKS = new Metric.Event("computer_tasks", "ns", Metric::formatTime);
public static final Metric.Event SERVER_TASKS = new Metric.Event("server_tasks", "ns", Metric::formatTime);
public static final Metric.Event JAVA_ALLOCATION = new Metric.Event("java_allocation", "bytes", Metric::formatBytes);
public static final Metric.Event PERIPHERAL_OPS = new Metric.Event("peripheral", "ns", Metric::formatTime);
public static final Metric.Event FS_OPS = new Metric.Event("fs", "ns", Metric::formatTime);

View File

@ -0,0 +1,114 @@
// SPDX-FileCopyrightText: 2023 The CC: Tweaked Developers
//
// SPDX-License-Identifier: MPL-2.0
package dan200.computercraft.core.metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
/**
* Provides a way to get the memory allocated by a specific thread.
* <p>
* This uses Hotspot-specific functionality, so may not be available on all JVMs. Consumers should call
* {@link #isSupported()} before calling more specific methods.
*
* @see com.sun.management.ThreadMXBean
*/
public final class ThreadAllocations {
private static final Logger LOG = LoggerFactory.getLogger(ThreadAllocations.class);
private static final @Nullable MethodHandle threadAllocatedBytes;
private static final @Nullable MethodHandle threadsAllocatedBytes;
static {
MethodHandle threadAllocatedBytesHandle, threadsAllocatedBytesHandle;
try {
var threadMxBean = Class.forName("com.sun.management.ThreadMXBean").asSubclass(ThreadMXBean.class);
var bean = ManagementFactory.getPlatformMXBean(threadMxBean);
// Enable allocation tracking.
threadMxBean.getMethod("setThreadAllocatedMemoryEnabled", boolean.class).invoke(bean, true);
// Just probe this method once to check it doesn't error.
threadMxBean.getMethod("getCurrentThreadAllocatedBytes").invoke(bean);
threadAllocatedBytesHandle = MethodHandles.publicLookup()
.findVirtual(threadMxBean, "getThreadAllocatedBytes", MethodType.methodType(long.class, long.class))
.bindTo(bean);
threadsAllocatedBytesHandle = MethodHandles.publicLookup()
.findVirtual(threadMxBean, "getThreadAllocatedBytes", MethodType.methodType(long[].class, long[].class))
.bindTo(bean);
} catch (LinkageError | ReflectiveOperationException | RuntimeException e) {
LOG.warn("Cannot track allocated memory of computer threads", e);
threadAllocatedBytesHandle = threadsAllocatedBytesHandle = null;
}
threadAllocatedBytes = threadAllocatedBytesHandle;
threadsAllocatedBytes = threadsAllocatedBytesHandle;
}
private ThreadAllocations() {
}
/**
* Check whether the current JVM provides information about per-thread allocations.
*
* @return Whether per-thread allocation information is available.
*/
public static boolean isSupported() {
return threadAllocatedBytes != null;
}
/**
* Get an approximation the amount of memory a thread has allocated over its lifetime.
*
* @param threadId The ID of the thread.
* @return The allocated memory, in bytes.
* @see com.sun.management.ThreadMXBean#getThreadAllocatedBytes(long)
*/
public static long getAllocatedBytes(long threadId) {
if (threadAllocatedBytes == null) {
throw new UnsupportedOperationException("Allocated bytes are not supported");
}
try {
return (long) threadAllocatedBytes.invokeExact(threadId);
} catch (Throwable t) {
throw throwUnchecked0(t); // Should never occur, but if it does it's guaranteed to be a runtime exception.
}
}
/**
* Get an approximation the amount of memory a thread has allocated over its lifetime.
* <p>
* This is equivalent to calling {@link #getAllocatedBytes(long)} for each thread in {@code threadIds}.
*
* @param threadIds An array of thread IDs.
* @return An array with the same length as {@code threadIds}, containing the allocated memory for each thread.
* @see com.sun.management.ThreadMXBean#getThreadAllocatedBytes(long[])
*/
public static long[] getAllocatedBytes(long[] threadIds) {
if (threadsAllocatedBytes == null) {
throw new UnsupportedOperationException("Allocated bytes are not supported");
}
try {
return (long[]) threadsAllocatedBytes.invokeExact(threadIds);
} catch (Throwable t) {
throw throwUnchecked0(t); // Should never occur, but if it does it's guaranteed to be a runtime exception.
}
}
@SuppressWarnings({ "unchecked", "TypeParameterUnusedInFormals" })
private static <T extends Throwable> T throwUnchecked0(Throwable t) throws T {
throw (T) t;
}
}

View File

@ -5,10 +5,12 @@
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.Metric;
import dan200.computercraft.core.metrics.MetricsObserver;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.concurrent.GuardedBy;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@ -81,14 +83,15 @@ private interface Task {
void run(ComputerScheduler.Executor executor) throws InterruptedException;
}
public final class Worker implements ComputerScheduler.Worker {
public final class Worker implements ComputerScheduler.Worker, MetricsObserver {
private final Task run;
private final ComputerScheduler.Executor executor;
private long[] totals = new long[16];
volatile int executed = 0;
private Worker(ComputerScheduler scheduler, Task run) {
this.run = run;
this.executor = scheduler.createExecutor(this, MetricsObserver.discard());
this.executor = scheduler.createExecutor(this, this);
}
public ComputerScheduler.Executor executor() {
@ -138,5 +141,25 @@ public void unload() {
@Override
public void abortWithError() {
}
private synchronized void observeImpl(Metric metric, long value) {
if (metric.id() >= totals.length) totals = Arrays.copyOf(totals, Math.max(metric.id(), totals.length * 2));
totals[metric.id()] += value;
}
@Override
public void observe(Metric.Counter counter) {
observeImpl(counter, 1);
}
@Override
public void observe(Metric.Event event, long value) {
observeImpl(event, value);
}
public long getMetric(Metric metric) {
var totals = this.totals;
return metric.id() < totals.length ? totals[metric.id()] : 0;
}
}
}

View File

@ -5,22 +5,21 @@
package dan200.computercraft.core.computer.computerthread;
import dan200.computercraft.core.computer.TimeoutState;
import dan200.computercraft.core.metrics.Metrics;
import dan200.computercraft.core.metrics.ThreadAllocations;
import dan200.computercraft.test.core.ConcurrentHelpers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.*;
import static org.junit.jupiter.api.Assertions.*;
@Timeout(value = 15)
@ -92,4 +91,21 @@ public void testPauseIfSomeOtherMachine() throws Exception {
manager.startAndWait(computer);
}
@Test
public void testAllocationTracking() throws Exception {
Assumptions.assumeTrue(ThreadAllocations.isSupported(), "Allocation tracking is supported");
var size = 1024 * 1024 * 64;
var computer = manager.createWorker((executor, timeout) -> {
// Allocate some slab of memory. We try to blackhole the allocated object, but it's pretty naive
// so who knows how useful it'll be.
assertNotEquals(0, Objects.toString(new byte[size]).length());
});
manager.startAndWait(computer);
assertThat(computer.getMetric(Metrics.JAVA_ALLOCATION), allOf(
greaterThan((long) size), lessThan((long) (size + (size >> 2)))
));
}
}

View File

@ -210,6 +210,7 @@
"tracking_field.computercraft.http_download.name": "HTTP download",
"tracking_field.computercraft.http_requests.name": "HTTP requests",
"tracking_field.computercraft.http_upload.name": "HTTP upload",
"tracking_field.computercraft.java_allocation.name": "Java Allocations",
"tracking_field.computercraft.max": "%s (max)",
"tracking_field.computercraft.peripheral.name": "Peripheral calls",
"tracking_field.computercraft.server_tasks.name": "Server tasks",

View File

@ -210,6 +210,7 @@
"tracking_field.computercraft.http_download.name": "HTTP download",
"tracking_field.computercraft.http_requests.name": "HTTP requests",
"tracking_field.computercraft.http_upload.name": "HTTP upload",
"tracking_field.computercraft.java_allocation.name": "Java Allocations",
"tracking_field.computercraft.max": "%s (max)",
"tracking_field.computercraft.peripheral.name": "Peripheral calls",
"tracking_field.computercraft.server_tasks.name": "Server tasks",