1
0
mirror of https://github.com/kepler155c/opus synced 2025-01-23 05:36:53 +00:00
opus/sys/network/transport.lua

138 lines
3.4 KiB
Lua
Raw Normal View History

2017-05-09 05:57:00 +00:00
--[[
2018-01-24 22:39:38 +00:00
Low level socket protocol implementation.
2017-05-09 05:57:00 +00:00
2018-01-24 22:39:38 +00:00
* sequencing
* background read buffering
2017-05-09 05:57:00 +00:00
]]--
2018-01-13 20:17:26 +00:00
local Event = require('event')
2017-10-25 03:01:40 +00:00
2018-01-13 20:17:26 +00:00
local os = _G.os
2017-05-09 05:57:00 +00:00
local computerId = os.getComputerID()
2017-10-25 03:01:40 +00:00
local transport = {
2018-01-24 22:39:38 +00:00
timers = { },
sockets = { },
UID = 0,
2017-05-09 05:57:00 +00:00
}
2017-10-25 03:01:40 +00:00
_G.transport = transport
2017-05-09 05:57:00 +00:00
function transport.open(socket)
2018-01-24 22:39:38 +00:00
transport.UID = transport.UID + 1
2018-01-20 12:18:13 +00:00
2018-01-24 22:39:38 +00:00
transport.sockets[socket.sport] = socket
socket.activityTimer = os.clock()
socket.uid = transport.UID
2017-05-09 05:57:00 +00:00
end
function transport.read(socket)
2018-01-24 22:39:38 +00:00
local data = table.remove(socket.messages, 1)
if data then
return unpack(data)
end
2017-05-09 05:57:00 +00:00
end
function transport.write(socket, data)
2018-10-31 04:05:29 +00:00
--_debug('>> ' .. Util.tostring({ type = 'DATA', seq = socket.wseq }))
2018-01-24 22:39:38 +00:00
socket.transmit(socket.dport, socket.dhost, data)
2017-05-09 05:57:00 +00:00
2018-01-24 22:39:38 +00:00
--local timerId = os.startTimer(3)
2017-05-09 05:57:00 +00:00
2018-01-24 22:39:38 +00:00
--transport.timers[timerId] = socket
--socket.timers[socket.wseq] = timerId
2017-05-09 05:57:00 +00:00
2018-01-24 22:39:38 +00:00
socket.wseq = socket.wseq + 1
2017-05-09 05:57:00 +00:00
end
2018-01-07 03:25:33 +00:00
function transport.ping(socket)
2018-10-31 04:05:29 +00:00
--_debug('>> ' .. Util.tostring({ type = 'DATA', seq = socket.wseq }))
2018-01-24 22:39:38 +00:00
if os.clock() - socket.activityTimer > 10 then
socket.activityTimer = os.clock()
socket.transmit(socket.dport, socket.dhost, {
type = 'PING',
seq = -1,
})
2018-02-08 07:32:28 +00:00
local timerId = os.startTimer(5)
2018-01-24 22:39:38 +00:00
transport.timers[timerId] = socket
socket.timers[-1] = timerId
end
2018-01-07 03:25:33 +00:00
end
2017-05-09 05:57:00 +00:00
function transport.close(socket)
2018-01-24 22:39:38 +00:00
transport.sockets[socket.sport] = nil
2017-05-09 05:57:00 +00:00
end
2018-01-13 20:17:26 +00:00
Event.on('timer', function(_, timerId)
2018-01-24 22:39:38 +00:00
local socket = transport.timers[timerId]
2018-01-07 03:25:33 +00:00
2018-01-24 22:39:38 +00:00
if socket and socket.connected then
print('transport timeout - closing socket ' .. socket.sport)
socket:close()
transport.timers[timerId] = nil
end
2018-01-13 20:17:26 +00:00
end)
2017-05-09 05:57:00 +00:00
2018-01-13 20:17:26 +00:00
Event.on('modem_message', function(_, _, dport, dhost, msg, distance)
2018-03-30 17:12:46 +00:00
if dhost == computerId and type(msg) == 'table' then
2018-01-24 22:39:38 +00:00
local socket = transport.sockets[dport]
if socket and socket.connected then
2018-10-31 04:05:29 +00:00
--if msg.type then _debug('<< ' .. Util.tostring(msg)) end
2018-12-02 23:40:19 +00:00
if socket.co and coroutine.status(socket.co) == 'dead' then
_G._debug('socket coroutine dead')
socket:close()
2018-01-24 22:39:38 +00:00
2018-12-02 23:40:19 +00:00
elseif msg.type == 'DISC' then
2018-01-24 22:39:38 +00:00
-- received disconnect from other end
if socket.connected then
os.queueEvent('transport_' .. socket.uid)
end
socket.connected = false
socket:close()
elseif msg.type == 'ACK' then
local ackTimerId = socket.timers[msg.seq]
if ackTimerId then
os.cancelTimer(ackTimerId)
socket.timers[msg.seq] = nil
socket.activityTimer = os.clock()
transport.timers[ackTimerId] = nil
end
elseif msg.type == 'PING' then
socket.activityTimer = os.clock()
socket.transmit(socket.dport, socket.dhost, {
type = 'ACK',
seq = msg.seq,
})
elseif msg.type == 'DATA' and msg.data then
socket.activityTimer = os.clock()
if msg.seq ~= socket.rseq then
print('transport seq error - closing socket ' .. socket.sport)
2018-10-31 04:05:29 +00:00
_debug(msg.data)
_debug('current ' .. socket.rseq)
_debug('expected ' .. msg.seq)
2018-02-08 07:55:14 +00:00
-- socket:close()
-- os.queueEvent('transport_' .. socket.uid)
2018-01-24 22:39:38 +00:00
else
socket.rseq = socket.rseq + 1
table.insert(socket.messages, { msg.data, distance })
-- use resume instead ??
if not socket.messages[2] then -- table size is 1
os.queueEvent('transport_' .. socket.uid)
end
2018-10-31 04:05:29 +00:00
--_debug('>> ' .. Util.tostring({ type = 'ACK', seq = msg.seq }))
2018-01-24 22:39:38 +00:00
--socket.transmit(socket.dport, socket.dhost, {
-- type = 'ACK',
-- seq = msg.seq,
--})
end
end
end
end
2018-01-13 20:17:26 +00:00
end)