"TISCubed" thingy

master
osmarks 2 years ago
parent 0ecadee189
commit d9933d52c5

@ -0,0 +1,410 @@
import dataclasses
import re
import typing
import operator
import functools
import enum
import collections
import typing
MEM_SIZE = 256
WAITING = object()
class Direction(enum.Enum):
UP = 0
DOWN = 1
RIGHT = 2
LEFT = 3
ANY = 8
class RunState(enum.Enum):
IDLE = 0
RUNNING = 1
BLOCKED = 2
STOPPED = 3
@dataclasses.dataclass
class Node:
# node memory
# code is loaded in at the start of memory
# the program counter is simply the last memory location
memory: bytearray = dataclasses.field(default_factory=lambda: bytearray(MEM_SIZE))
state: RunState = RunState.IDLE
input_buffer: dict[Direction, int] = dataclasses.field(default_factory=dict)
read_return_location: typing.Optional[int] = None
#output_buffer: dict[Direction, int] = dataclasses.field(default_factory=dict)
blocked_on: typing.Optional[Direction] = None
def __getitem__(self, loc):
return self.memory[loc]
def __setitem__(self, loc, value):
assert loc is not None
if isinstance(value, list):
for i, x in enumerate(value):
self.memory[loc + i] = x % 256
else:
self.memory[loc] = value % 256
def pad_list(l, n, default):
return l + [default] * (n - len(l))
def display_hex(x): return hex(x)[2:].zfill(2)
def memdump(mem):
out = []
out.append(" \x1b[31;1m" + " ".join(map(display_hex, range(16))) + "\x1b[0m")
for i in range(0, len(mem), 16):
out.append("\x1b[32;1m" + display_hex(i) + "\x1b[0m " + " ".join(map(display_hex, mem[i:i+16])))
return "\n".join(out)
@dataclasses.dataclass
class ExecutionContext:
node: Node
params: tuple[int]
io: any
@dataclasses.dataclass
class Op:
name: str
function: typing.Callable[ExecutionContext, None]
ip_advancement: int
opcodes = {}
def make_binop_wrapper(name, fn):
@functools.wraps(fn)
def wrapper(ctx):
ctx.node[ctx.params[0]] = fn(ctx.node[ctx.params[1]], ctx.node[ctx.params[2]])
return Op(name, wrapper, 4)
def nop(node, *_): pass
opcodes[0x00] = Op("NOP", nop, 1)
def test_print(ctx):
print(f"{ctx.node[-1]}: {display_hex(ctx.params[0])} = {display_hex(ctx.node[ctx.params[0]])}")
opcodes[0x01] = Op("PRINT", test_print, 2)
def mov(ctx): # MOV dest src
ctx.node[ctx.params[0]] = ctx.node[ctx.params[1]]
opcodes[0x02] = Op("MOV", mov, 3)
opcodes[0x03] = make_binop_wrapper("ADD", operator.add) # ADD dest src1 src2
def mnz(ctx): # "move if not zero"; MEZ cond dest src
if ctx.node[ctx.params[0]] != 0: ctx.node[ctx.params[1]] = ctx.node[ctx.params[2]]
opcodes[0x04] = Op("MNZ", mnz, 4)
def inc(ctx): # INC dest
ctx.node[ctx.params[0]] += 1
opcodes[0x05] = Op("INC", inc, 2)
opcodes[0x06] = make_binop_wrapper("MUL", operator.mul)
opcodes[0x07] = make_binop_wrapper("MOD", operator.mod)
opcodes[0x08] = make_binop_wrapper("DIV", operator.floordiv)
opcodes[0x09] = make_binop_wrapper("SUB", operator.sub)
opcodes[0x10] = make_binop_wrapper("OR", operator.or_)
opcodes[0x11] = make_binop_wrapper("AND", operator.and_)
opcodes[0x12] = make_binop_wrapper("SHR", operator.rshift)
opcodes[0x13] = make_binop_wrapper("SHL", operator.lshift)
def mez(ctx): # "move if equal to zero"; MEZ cond dest src
if ctx.node[ctx.params[0]] == 0: ctx.node[ctx.params[1]] = ctx.node[ctx.params[2]]
opcodes[0x14] = Op("MEZ", mez, 4)
def idm(ctx): # "indirect destination move" - destination parameter is a memory location to fetch the destination from; IDM idest src
ctx.node[ctx.node[ctx.params[0]]] = ctx.node[ctx.params[1]]
opcodes[0x15] = Op("IDM", idm, 3)
def ism(ctx): # "indirect source move"; ISM dest isrc
ctx.node[ctx.params[0]] = ctx.node[ctx.node[ctx.params[1]]]
opcodes[0x16] = Op("ISM", ism, 3)
def imv(ctx): # "indirect move" - both destination and source are indirected; IMV idest isrc
ctx.node[ctx.node[ctx.params[0]]] = ctx.node[ctx.node[ctx.params[1]]]
opcodes[0x17] = Op("IMV", imv, 3)
opcodes[0x18] = make_binop_wrapper("SADD", lambda a, b: min(a + b, 255))
opcodes[0x19] = make_binop_wrapper("SSUB", lambda a, b: max(a - b, 0))
def write(ctx):
ctx.io["write"](ctx.params[0], ctx.node[ctx.params[1]])
opcodes[0x20] = Op("WR", write, 3)
def read(ctx):
ctx.io["read"](ctx.params[0], ctx.params[1])
opcodes[0x21] = Op("RE", read, 3)
def dump(ctx):
print(memdump(ctx.node.memory))
opcodes[0xfe] = Op("DUMP", dump, 1)
def halt(ctx):
memdump(ctx.node.memory)
ctx.node.state = RunState.STOPPED
opcodes[0xff] = Op("HALT", halt, 1)
def step_node(node, io):
ip = node[-1]
instr = node[ip:ip + 4].ljust(4, b"\x00")
opcode, a, b, c = instr
op = opcodes.get(opcode)
if op:
node[-1] += op.ip_advancement
op.function(ExecutionContext(node, (a, b, c), io))
else:
print("unknown instr", " ".join(map(display_hex, instr)), "at", display_hex(ip))
print(memdump(node.memory))
node[-1] += 1
node.state = RunState.STOPPED
def flatten(xs):
for x in xs:
if isinstance(x, (list, map, filter)):
for y in x:
yield y
else:
yield x
def assemble(code):
instructions = {}
for opcode, op in opcodes.items():
instructions[op.name] = (opcode, op.ip_advancement)
out = []
# implicit "I" label for program counter for branching
labels = { "I": MEM_SIZE - 1 }
unresolved_labels = collections.defaultdict(set)
backfill = collections.defaultdict(set)
position = 0
def resolve(param):
# ! operator on params emulates this ISA having immediate parameters by
if param[0] == "!":
# add to list of values needing storage, and add current output position to list of places to update when it gets a location
backfill[param[1:]].add(position)
return
if re.match(r"[A-Za-z][A-Za-z0-9_\-]*", param): # is label
try:
return labels[param]
except KeyError: # resolve label location later
unresolved_labels[param].add(position)
return 0
else:
return int(param, 16) % 256
def write(*things):
for value in flatten(things):
if isinstance(value, int):
out.append(value)
else:
out.append(resolve(value))
nonlocal position
position = len(out)
for line in filter(lambda x: x != "", map(str.strip, code.split("\n"))):
tokens = line.split()
for index, token in enumerate(tokens):
if token.startswith("#"):
tokens = tokens[:index]
break
if len(tokens) == 0: continue
# label definition
if tokens[0].endswith(":"):
label = tokens.pop(0)[:-1]
labels[label] = position
for unresolved_loc in unresolved_labels[label]:
out[unresolved_loc] = position
del unresolved_labels[label]
if len(tokens) > 0:
ltype = tokens[0].upper()
# raw output
if ltype == "!": write(tokens[1:])
# NOP padding
elif ltype == "!PAD":
write([0] * int(tokens[1], 16))
# instruction mnemonic
else:
instr = instructions[ltype]
if instr[1] != 0: # special instructions might move it variable amounts at some point
assert len(tokens) == instr[1], f"{ltype} takes {instr[1] - 1} operands"
write(instr[0], tokens[1:])
while len(backfill) > 0:
for value, locations in list(backfill.items()):
newpos = position
write(value)
for location in locations:
out[location] = newpos
print(f"Backfilled {display_hex(newpos)}: {value}")
del backfill[value]
for k in unresolved_labels:
print("Unresolved label:", k)
if len(out) >= MEM_SIZE:
print("Code space exceeded")
return out
n = Node()
n.state = RunState.RUNNING
n[0] = assemble("""
LOOP:
inc INCBUF
add TEMP !-50 INCBUF
# debug print
#! 01 INCBUF
#wr 0 INCBUF
#re 0 INCBUF
mnz TEMP I !LOOP
halt
INCBUF: ! 1
TEMP: ! 0
OUT: ! 44
""")
n2 = Node()
n2.state = RunState.RUNNING
n2[0] = assemble("""
LOOP2:
inc INCBUF
add TEMP !-50 INCBUF
mnz TEMP I !LOOP2
LOOP:
re 8 BEE
#wr 1 BEE
#! 01 BEE
mov I !LOOP
TEMP: ! 0
BEE: ! 0
INCBUF: ! 4
""")
n3 = Node()
n3.state = RunState.RUNNING
n3[0] = assemble("""
mov M !0
mov B !0
LOOP:
sub X M !40
mez X I !DONE
wr 0 M
ism B M
inc M
wr 0 B
mov I !LOOP
DONE:
wr 0 !0FF
halt
M: ! 0
X: ! 87
B: ! 0
""")
def offset(tup, idx, by):
return tup[:idx] + (tup[idx] + by,) + tup[idx + 1:]
opposite_directions = { Direction.UP: Direction.DOWN, Direction.DOWN: Direction.UP, Direction.LEFT: Direction.RIGHT, Direction.RIGHT: Direction.LEFT }
def apply_direction(coords, dir):
if dir == Direction.UP: return offset(coords, 1, 1)
elif dir == Direction.DOWN: return offset(coords, 1, -1)
elif dir == Direction.LEFT: return offset(coords, 0, -1)
elif dir == Direction.RIGHT: return offset(coords, 0, 1)
bootloader = """
!PAD E0
LOOP:
re 8 RI # read target location from arbitrary side into buffer
add RJ RI !1
mez RJ I !0 # if target location is 255, jump to 0 (normal thing start)
re 8 RJ # read data into other buffer
idm RI RJ # transfer data into specified location
mov I !LOOP # unconditional jump back to start
RI: ! 0
RJ: ! 0
"""
bootloader_machine_code = assemble(bootloader)
def new_node():
n = Node()
print("starting node")
n.state = RunState.RUNNING
n[0] = bootloader_machine_code
return n
grid = collections.defaultdict(new_node)
grid[0, 0] = n
grid[0, 1] = n2
grid[0, 2] = n3
def write(node, orig, dir, val):
#print("WR", orig, dir, hex(val))
if dir in node.input_buffer:
print("deadlock (write) by", orig, "in", dir, "target", grid[apply_direction(orig, dir)], node.input_buffer)
node.state = RunState.BLOCKED
return
other = grid[apply_direction(orig, dir)]
opp = opposite_directions[dir]
# if the other node is waiting on communication from this node, dump data from here into memory
# and unblock it
if opp == other.blocked_on or other.blocked_on == Direction.ANY:
other.blocked_on = None
other.state = RunState.RUNNING
other[other.read_return_location] = val
# if it is not, then put data into its input buffer (it will unblock this node if it ever reads on this)
else:
other.input_buffer[opp] = val
# switch state to blocked
node.state = RunState.BLOCKED
node.blocked_on = dir
def read(node, orig, dir, ret):
#print("RE", orig, dir)
if dir == Direction.ANY and len(node.input_buffer) > 0:
rdir, val = node.input_buffer.popitem()
other = grid[apply_direction(orig, rdir)]
opp = opposite_directions[rdir]
if other.blocked_on == opp or other.blocked_on == Direction.ANY:
other.blocked_on = None
other.state = RunState.RUNNING
node[ret] = val
return
# if input already buffered
if dir in node.input_buffer:
other = grid[apply_direction(orig, dir)]
opp = opposite_directions[dir]
# remove blocking state
if other.blocked_on == opp or other.blocked_on == Direction.ANY:
other.blocked_on = None
other.state = RunState.RUNNING
# put buffered data into memory at specified return address
node[ret] = node.input_buffer[dir]
del node.input_buffer[dir]
else:
# set to blocked, put return address in node data
node.read_return_location = ret
node.state = RunState.BLOCKED
node.blocked_on = dir
while True:
ran_one = False
for coords, node in list(grid.items()):
#the reason being is that then you could play an awesome game of core-war on itprint(coords, node.state)
if node.state == RunState.RUNNING:
step_node(node, {
"write": lambda dir, val: write(node, coords, Direction(dir), val),
"read": lambda dir, ret: read(node, coords, Direction(dir), ret)
})
ran_one = True
if not ran_one: break
Loading…
Cancel
Save