Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 163 additions & 14 deletions lua-channels.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ local _M = {}
local RECV = 0x1
local SEND = 0x2
local NOP = 0x3

local TIMEOUT = {err = "TIMEOUT"}
local luajit = not not (package.loaded['jit'] and jit.version_num)

-- Global objects for scheduler
local tasks_runnable = {} -- list of coroutines ready to be resumed

local tasks_to = {} -- all the timeout tasks
local altexec

----------------------------------------------------------------------------
--- Helpers
Expand Down Expand Up @@ -113,7 +114,14 @@ local Set = {
end
end,

random = function(self, v)
random = function(self, to)
if to then
local arr = {}
for i = 1, #self.l do
if self.l[i].to then table.insert(arr, self.l[i]) end
end
return random_choice(arr)
end
return random_choice(self.l)
end,

Expand All @@ -122,6 +130,8 @@ local Set = {
end,
}



-- Circular Buffer data structure
local CircularBuffer = {
new = function(self, size)
Expand Down Expand Up @@ -167,12 +177,22 @@ local function scheduler()
local i = 0
while #tasks_runnable > 0 do
local co = table.remove(tasks_runnable)
tasks_to[co] = nil
local okay, emsg = coroutine.resume(co)
if not okay then
error(emsg)
end
i = i + 1
end

local now = os.time()
for co, alt in pairs(tasks_to) do
if alt and now >= alt.to then
altexec(alt)
tasks_to[co] = nil
alt.c:_get_alts(RECV):remove(alt)
end
end
return i
end

Expand Down Expand Up @@ -224,7 +244,17 @@ local function altcopy(a, b)

-- Otherwise it's always okay to receive and then send.
if r ~= nil then
r.alt_array.value = c._buf:pop()
if r.to then
r.alt_array.value = TIMEOUT
r.alt_array.resolved = 1
return true
elseif r.closed then
r.alt_array.value = nil
r.alt_array.resolved = 1
return true
else
r.alt_array.value = c._buf:pop()
end
end
if s ~= nil then
c._buf:push(s.p)
Expand Down Expand Up @@ -259,17 +289,20 @@ local function altcanexec(a)
end

-- Alt can be execed so find a counterpart Alt and exec it!
local function altexec(a)
altexec = function (a)
local c, op = a.c, a.op
local other_alts = c:_get_other_alts(op)
local other_a = other_alts:random()
local other_a = other_alts:random(a.to)
-- other_a may be nil
altcopy(a, other_a)
local isend = altcopy(a, other_a)

if other_a ~= nil then
-- Disengage from channels used by the other Alt and make it ready.
altalldequeue(other_a.alt_array)
other_a.alt_array.resolved = other_a.alt_index
task_ready(other_a.alt_array.task)
elseif isend then
task_ready(a.alt_array.task)
end
end

Expand All @@ -291,13 +324,19 @@ local function chanalt(alt_array, canblock)
"pass valid channel to a c field of alt")
if altcanexec(a) == true then
table.insert(list_of_canexec_i, i)
elseif a.to then

local sc = coroutine.running()
if not tasks_to[sc] then
tasks_to[sc] = a
end
end
end

if #list_of_canexec_i > 0 then
local i = random_choice(list_of_canexec_i)
altexec(alt_array[i])
return i, alt_array.value
return i, alt_array.value, alt_array.closed == nil
end

if canblock ~= true then
Expand All @@ -322,7 +361,7 @@ local function chanalt(alt_array, canblock)
assert(alt_array.resolved > 0)

local r = alt_array.resolved
return r, alt_array.value
return r, alt_array.value, alt_array.closed == nil
end


Expand All @@ -342,11 +381,11 @@ local Channel = {
return true
end,

recv = function(self)
local alts = {{c = self, op = RECV}}
recv = function(self, to)
local alts = {{c = self, op = RECV, to = to and os.time() + to or nil}}
local s, msg = chanalt(alts, true)
assert(s == 1)
return msg
return msg, alts[1].closed == nil
end,

nbsend = function(self, msg)
Expand All @@ -359,6 +398,14 @@ local Channel = {
return s == 1, msg
end,

close = function(self)
local alts = self:_get_alts(RECV)
for _, v in ipairs(alts.l) do
v.closed = true
altexec(v)
end
end,

_get_alts = function(self, op)
return (op == RECV) and self._recv_alts or self._send_alts
end,
Expand Down Expand Up @@ -391,7 +438,7 @@ _M.chanalt = chanalt
_M.RECV = RECV
_M.SEND = SEND
_M.NOP = NOP

_M.Error = {TIMEOUT = TIMEOUT}
----------------------------------------------------------------------------
----------------------------------------------------------------------------
-- Tests
Expand Down Expand Up @@ -453,7 +500,7 @@ local tests = {
local function a(c, name)
-- Blocking send and recv from the same process
local alt = {{c = c, op = task.SEND, p = 1},
{c = c, op = task.RECV}}
{c = c, op = task.RECV}}
local i, v = task.chanalt(alt, true)
local k = string.format('%s %s', name, i == 1 and "send" or "recv")
l[k] = (l[k] or 0) + 1
Expand Down Expand Up @@ -639,6 +686,108 @@ local tests = {
assert(done)
end,

close_test = function()
local values = {}
for i = 1, 1000 do table.insert(values, i) end
local chan = task.Channel:new()
local done = task.Channel:new()
task.spawn(function()
local i = 0
while true do
local msg, more = chan:recv()
if not more then
break
else
i = i + 1
end
end
assert(i == 200)
done:send(1)
end)

task.spawn(function()
for i =1, 200 do
chan:send(i)
end
chan:close()
end)

task.scheduler()
end,

recv_timeout = function()
print("\t testing... (this will block 8 seconds)")
-- use copas to sleep
local status, copas = pcall(require, "copas")
if not status then
print("\t no copas install skip testing...")
return
end

-- testing data
local values = {}
for i = 1, 1001 do
table.insert(values, i)
end

local exists = function(t, v)
for _, val in ipairs(t) do
if val == v then return true end
end
return false
end


local chan = task.Channel:new(1000)

-- we have three reader, only one will timeout
local r1 = function()
for i = 1, 500 do
local v = chan:recv()
assert(exists(values, v))
end
end

local r2 = function()
for i =1, 500 do
local v = chan:recv()
assert(exists(values, v))
end
end

local r3 = function()
local v = chan:recv(1)
assert(task.Error.TIMEOUT == v)
end

local r4 = function()
local v = chan:recv(6)
assert(exists(values, v))
end

local sender = function()
copas.sleep(5)
for _, v in ipairs(values) do
chan:send(v)
end
end

task.spawn(r1)
task.spawn(r2)
task.spawn(r3)
task.spawn(r4)
copas.addthread(sender)

local done
done = true

for i = 1, 80 do
copas.step(0.1)
task.scheduler()
end
assert(done)
end

}

-- No parameters: run tests
Expand Down