packages1/multitask/lib/multitask.lua

175 lines
4.4 KiB
Lua

local mt = {}
mt.newPool = function(pool_options)
pool_options = pool_options or {}
local pool = {}
pool.threads = {} -- indexed by coroutines, contains their metadata
pool.namedThreads = {} -- indexed by coroutine names, contains their metadata also.
pool.byID = {} -- aaaa bees
-- coroutines are named if their metadata has a name field.
pool.threadcount = 0 -- used for detecting when all things in a pool are depleted
pool.id = math.random(1,99999999999999)
pool.clear = function()
pool.threads = {}
pool.namedThreads = {}
pool.byID = {}
pool.threadcount = 0
end
local idcount = 0
pool.add = function(fn, options)
options = options or {}
if not fn then error("expected function",2) end
options.co = coroutine.create(function() local retv = fn(options) if retv~=nil then options.retv=retv end os.queueEvent("ignore_this") end)
options.fn = fn
options.id = idcount
idcount = idcount + 1
pool.threads[options.co] = options
if options.name then
pool.namedThreads[options.name] = options
end
pool.byID[options.id] = options
pool.threadcount = pool.threadcount + 1
os.queueEvent("pool_fn_added",options,pool)
return options
end
pool.rm = function(name)
local tr = nil
if type(name) == "string" then
if not pool.namedThreads[name] then
error("no thread with name "..name)
end
tr = pool.namedThreads[name]
elseif type(name)=="number" then
if not pool.byID[name] then
error("no thread with ID "..name)
end
tr = pool.byID[name]
else
tr = name
end
tr = pool.byID[tr.id]
os.queueEvent("pool_fn_end",tr.id,pool.id)
pool.threads[tr.co] = nil
if type(name)=="string" then pool.namedThreads[name] = nil end
pool.byID[name] = nil
pool.threadcount = pool.threadcount - 1
return true
end
pool.addFile = function(filename, options)
pool.add(function()
dofile(filename)
end,options)
end
pool.run = function(terminable)
terminable = terminable == true -- if false, terminate events will be echoed to coroutines instead of
-- terminating this pool
while true do
local event = {coroutine.yield()}
if event[1] == "terminate" and terminable then
return
end
for thread, options in pairs(pool.threads) do
if coroutine.status(thread) ~= "dead" then
if event[1] == options.filter or options.filter == nil then
local x,e = coroutine.resume(thread,unpack(event))
if not x then
os.queueEvent("pool_fn_crash",options.id,pool.id,e)
pool.rm(options)
if pool_options.crash_on_error then
error(e)
end
else
options.filter = e
end
end
else
pool.rm(options)
end
end
if pool.threadcount == 0 then return end
end
end
pool.await = function(...)
local args = {...}
local retv = {}
for i,v in ipairs(args) do
local to_match = nil
if type(v) == "table" then
to_match = v.id
elseif type(v) == "thread" then
to_match = pool.threads[v].id
elseif type(name)=="number" then
if not pool.byID[name] then
error("no thread with ID "..name)
end
to_match = name
else
if not pool.namedThreads[v] then
error("no thread named "..v)
return
end
to_match = pool.namedThreads[v].id
end
if coroutine.status(pool.byID[to_match].co) == "dead" then
table.insert(retv, pool.byID[to_match].retv)
else
while true do
local e, b, c, d = os.pullEvent()
if (e=="pool_fn_end" or e=="pool_fn_crash") then
if c == pool.id and b == to_match then
table.insert(retv, pool.byID[b].retv)
break
end
end
end
end
end
return unpack(retv)
end
pool.await_any = function(...)
local args = {...}
local match = {}
for i,v in ipairs(args) do
local to_match = nil
if type(v) == "table" then
to_match = v.id
elseif type(name)=="number" then
if not pool.byID[name] then
error("no thread with ID "..name)
end
to_match = name
else
if not pool.namedThreads[v] then
error("no thread named "..v)
return
end
to_match = pool.namedThreads[v].id
end
if coroutine.status(pool.byID[to_match].co) == "dead" then
return pool.byID[to_match].retv
end
table.insert(match,to_match)
end
while true do
local e, b, c, d = os.pullEvent()
if e=="pool_fn_end" and c == pool.id then
for i,v in ipairs(match) do
if b == v then
return pool.byID[v].retv
end
end
end
end
end
return pool
end
return mt