diff --git a/lua-channels.lua b/lua-channels.lua index 3757194..c393803 100644 --- a/lua-channels.lua +++ b/lua-channels.lua @@ -68,12 +68,13 @@ local _M = {} local RECV = 0x1 local SEND = 0x2 local NOP = 0x3 - +local TIMEOUT = {err = "TIMEOUT"} local luajit = not not (package.loaded['jit'] and jit.version_num) -- Global objects for scheduler local tasks_runnable = {} -- list of coroutines ready to be resumed - +local tasks_to = {} -- all the timeout tasks +local altexec ---------------------------------------------------------------------------- --- Helpers @@ -113,7 +114,14 @@ local Set = { end end, - random = function(self, v) + random = function(self, to) + if to then + local arr = {} + for i = 1, #self.l do + if self.l[i].to then table.insert(arr, self.l[i]) end + end + return random_choice(arr) + end return random_choice(self.l) end, @@ -122,6 +130,8 @@ local Set = { end, } + + -- Circular Buffer data structure local CircularBuffer = { new = function(self, size) @@ -167,12 +177,22 @@ local function scheduler() local i = 0 while #tasks_runnable > 0 do local co = table.remove(tasks_runnable) + tasks_to[co] = nil local okay, emsg = coroutine.resume(co) if not okay then error(emsg) end i = i + 1 end + + local now = os.time() + for co, alt in pairs(tasks_to) do + if alt and now >= alt.to then + altexec(alt) + tasks_to[co] = nil + alt.c:_get_alts(RECV):remove(alt) + end + end return i end @@ -224,7 +244,17 @@ local function altcopy(a, b) -- Otherwise it's always okay to receive and then send. if r ~= nil then - r.alt_array.value = c._buf:pop() + if r.to then + r.alt_array.value = TIMEOUT + r.alt_array.resolved = 1 + return true + elseif r.closed then + r.alt_array.value = nil + r.alt_array.resolved = 1 + return true + else + r.alt_array.value = c._buf:pop() + end end if s ~= nil then c._buf:push(s.p) @@ -259,17 +289,20 @@ local function altcanexec(a) end -- Alt can be execed so find a counterpart Alt and exec it! -local function altexec(a) +altexec = function (a) local c, op = a.c, a.op local other_alts = c:_get_other_alts(op) - local other_a = other_alts:random() + local other_a = other_alts:random(a.to) -- other_a may be nil - altcopy(a, other_a) + local isend = altcopy(a, other_a) + if other_a ~= nil then -- Disengage from channels used by the other Alt and make it ready. altalldequeue(other_a.alt_array) other_a.alt_array.resolved = other_a.alt_index task_ready(other_a.alt_array.task) + elseif isend then + task_ready(a.alt_array.task) end end @@ -291,13 +324,19 @@ local function chanalt(alt_array, canblock) "pass valid channel to a c field of alt") if altcanexec(a) == true then table.insert(list_of_canexec_i, i) + elseif a.to then + + local sc = coroutine.running() + if not tasks_to[sc] then + tasks_to[sc] = a + end end end if #list_of_canexec_i > 0 then local i = random_choice(list_of_canexec_i) altexec(alt_array[i]) - return i, alt_array.value + return i, alt_array.value, alt_array.closed == nil end if canblock ~= true then @@ -322,7 +361,7 @@ local function chanalt(alt_array, canblock) assert(alt_array.resolved > 0) local r = alt_array.resolved - return r, alt_array.value + return r, alt_array.value, alt_array.closed == nil end @@ -342,11 +381,11 @@ local Channel = { return true end, - recv = function(self) - local alts = {{c = self, op = RECV}} + recv = function(self, to) + local alts = {{c = self, op = RECV, to = to and os.time() + to or nil}} local s, msg = chanalt(alts, true) assert(s == 1) - return msg + return msg, alts[1].closed == nil end, nbsend = function(self, msg) @@ -359,6 +398,14 @@ local Channel = { return s == 1, msg end, + close = function(self) + local alts = self:_get_alts(RECV) + for _, v in ipairs(alts.l) do + v.closed = true + altexec(v) + end + end, + _get_alts = function(self, op) return (op == RECV) and self._recv_alts or self._send_alts end, @@ -391,7 +438,7 @@ _M.chanalt = chanalt _M.RECV = RECV _M.SEND = SEND _M.NOP = NOP - +_M.Error = {TIMEOUT = TIMEOUT} ---------------------------------------------------------------------------- ---------------------------------------------------------------------------- -- Tests @@ -453,7 +500,7 @@ local tests = { local function a(c, name) -- Blocking send and recv from the same process local alt = {{c = c, op = task.SEND, p = 1}, - {c = c, op = task.RECV}} + {c = c, op = task.RECV}} local i, v = task.chanalt(alt, true) local k = string.format('%s %s', name, i == 1 and "send" or "recv") l[k] = (l[k] or 0) + 1 @@ -639,6 +686,108 @@ local tests = { assert(done) end, + close_test = function() + local values = {} + for i = 1, 1000 do table.insert(values, i) end + local chan = task.Channel:new() + local done = task.Channel:new() + task.spawn(function() + local i = 0 + while true do + local msg, more = chan:recv() + if not more then + break + else + i = i + 1 + end + end + assert(i == 200) + done:send(1) + end) + + task.spawn(function() + for i =1, 200 do + chan:send(i) + end + chan:close() + end) + + task.scheduler() + end, + + recv_timeout = function() + print("\t testing... (this will block 8 seconds)") + -- use copas to sleep + local status, copas = pcall(require, "copas") + if not status then + print("\t no copas install skip testing...") + return + end + + -- testing data + local values = {} + for i = 1, 1001 do + table.insert(values, i) + end + + local exists = function(t, v) + for _, val in ipairs(t) do + if val == v then return true end + end + return false + end + + + local chan = task.Channel:new(1000) + + -- we have three reader, only one will timeout + local r1 = function() + for i = 1, 500 do + local v = chan:recv() + assert(exists(values, v)) + end + end + + local r2 = function() + for i =1, 500 do + local v = chan:recv() + assert(exists(values, v)) + end + end + + local r3 = function() + local v = chan:recv(1) + assert(task.Error.TIMEOUT == v) + end + + local r4 = function() + local v = chan:recv(6) + assert(exists(values, v)) + end + + local sender = function() + copas.sleep(5) + for _, v in ipairs(values) do + chan:send(v) + end + end + + task.spawn(r1) + task.spawn(r2) + task.spawn(r3) + task.spawn(r4) + copas.addthread(sender) + + local done + done = true + + for i = 1, 80 do + copas.step(0.1) + task.scheduler() + end + assert(done) + end + } -- No parameters: run tests