From 58677136780f2ef7670d5b4b250c1439ecfeb9a7 Mon Sep 17 00:00:00 2001 From: Liulijin Date: Thu, 25 Dec 2014 19:48:21 +0800 Subject: [PATCH 1/2] add recv timeout support --- lua-channels.lua | 131 +++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 120 insertions(+), 11 deletions(-) diff --git a/lua-channels.lua b/lua-channels.lua index 3757194..b740de3 100644 --- a/lua-channels.lua +++ b/lua-channels.lua @@ -68,12 +68,13 @@ local _M = {} local RECV = 0x1 local SEND = 0x2 local NOP = 0x3 - +local TIMEOUT = {err = "TIMEOUT"} local luajit = not not (package.loaded['jit'] and jit.version_num) -- Global objects for scheduler local tasks_runnable = {} -- list of coroutines ready to be resumed - +local tasks_to = {} -- all the timeout tasks +local altexec ---------------------------------------------------------------------------- --- Helpers @@ -113,7 +114,14 @@ local Set = { end end, - random = function(self, v) + random = function(self, to) + if to then + local arr = {} + for i = 1, #self.l do + if self.l[i].to then table.insert(arr, self.l[i]) end + end + return random_choice(arr) + end return random_choice(self.l) end, @@ -122,6 +130,8 @@ local Set = { end, } + + -- Circular Buffer data structure local CircularBuffer = { new = function(self, size) @@ -167,12 +177,22 @@ local function scheduler() local i = 0 while #tasks_runnable > 0 do local co = table.remove(tasks_runnable) + tasks_to[co] = nil local okay, emsg = coroutine.resume(co) if not okay then error(emsg) end i = i + 1 end + + local now = os.time() + for co, alt in pairs(tasks_to) do + if alt and now >= alt.to then + altexec(alt) + tasks_to[co] = nil + alt.c:_get_alts(RECV):remove(alt) + end + end return i end @@ -224,7 +244,13 @@ local function altcopy(a, b) -- Otherwise it's always okay to receive and then send. if r ~= nil then - r.alt_array.value = c._buf:pop() + if r.to then + r.alt_array.value = TIMEOUT + r.alt_array.resolved = 1 + return true + else + r.alt_array.value = c._buf:pop() + end end if s ~= nil then c._buf:push(s.p) @@ -259,17 +285,20 @@ local function altcanexec(a) end -- Alt can be execed so find a counterpart Alt and exec it! -local function altexec(a) +altexec = function (a) local c, op = a.c, a.op local other_alts = c:_get_other_alts(op) - local other_a = other_alts:random() + local other_a = other_alts:random(a.to) -- other_a may be nil - altcopy(a, other_a) + local isto = altcopy(a, other_a) + if other_a ~= nil then -- Disengage from channels used by the other Alt and make it ready. altalldequeue(other_a.alt_array) other_a.alt_array.resolved = other_a.alt_index task_ready(other_a.alt_array.task) + elseif isto then + task_ready(a.alt_array.task) end end @@ -291,6 +320,12 @@ local function chanalt(alt_array, canblock) "pass valid channel to a c field of alt") if altcanexec(a) == true then table.insert(list_of_canexec_i, i) + elseif a.to then + + local sc = coroutine.running() + if not tasks_to[sc] then + tasks_to[sc] = a + end end end @@ -342,8 +377,8 @@ local Channel = { return true end, - recv = function(self) - local alts = {{c = self, op = RECV}} + recv = function(self, to) + local alts = {{c = self, op = RECV, to = to and os.time() + to or nil}} local s, msg = chanalt(alts, true) assert(s == 1) return msg @@ -391,7 +426,7 @@ _M.chanalt = chanalt _M.RECV = RECV _M.SEND = SEND _M.NOP = NOP - +_M.Error = {TIMEOUT = TIMEOUT} ---------------------------------------------------------------------------- ---------------------------------------------------------------------------- -- Tests @@ -453,7 +488,7 @@ local tests = { local function a(c, name) -- Blocking send and recv from the same process local alt = {{c = c, op = task.SEND, p = 1}, - {c = c, op = task.RECV}} + {c = c, op = task.RECV}} local i, v = task.chanalt(alt, true) local k = string.format('%s %s', name, i == 1 and "send" or "recv") l[k] = (l[k] or 0) + 1 @@ -639,6 +674,80 @@ local tests = { assert(done) end, + recv_timeout = function() + print("\t testing... (this will block 8 seconds)") + -- use copas to sleep + local status, copas = pcall(require, "copas") + if not status then + print("\t no copas install skip testing...") + return + end + + -- testing data + local values = {} + for i = 1, 1001 do + table.insert(values, i) + end + + local exists = function(t, v) + for _, val in ipairs(t) do + if val == v then return true end + end + return false + end + + + local chan = task.Channel:new(1000) + + -- we have three reader, only one will timeout + local r1 = function() + for i = 1, 500 do + local v = chan:recv() + assert(exists(values, v)) + end + end + + local r2 = function() + for i =1, 500 do + local v = chan:recv() + assert(exists(values, v)) + end + end + + local r3 = function() + local v = chan:recv(1) + assert(task.Error.TIMEOUT == v) + end + + local r4 = function() + local v = chan:recv(6) + print(v) + assert(exists(values, v)) + end + + local sender = function() + copas.sleep(5) + for _, v in ipairs(values) do + chan:send(v) + end + end + + task.spawn(r1) + task.spawn(r2) + task.spawn(r3) + task.spawn(r4) + copas.addthread(sender) + + local done + done = true + + for i = 1, 80 do + copas.step(0.1) + task.scheduler() + end + assert(done) + end + } -- No parameters: run tests From 703b772d80442503b71e4a89aa51a6f7b1f76da9 Mon Sep 17 00:00:00 2001 From: Liulijin Date: Sun, 4 Jan 2015 10:51:16 +0800 Subject: [PATCH 2/2] add close support --- lua-channels.lua | 52 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 46 insertions(+), 6 deletions(-) diff --git a/lua-channels.lua b/lua-channels.lua index b740de3..c393803 100644 --- a/lua-channels.lua +++ b/lua-channels.lua @@ -248,6 +248,10 @@ local function altcopy(a, b) r.alt_array.value = TIMEOUT r.alt_array.resolved = 1 return true + elseif r.closed then + r.alt_array.value = nil + r.alt_array.resolved = 1 + return true else r.alt_array.value = c._buf:pop() end @@ -290,14 +294,14 @@ altexec = function (a) local other_alts = c:_get_other_alts(op) local other_a = other_alts:random(a.to) -- other_a may be nil - local isto = altcopy(a, other_a) + local isend = altcopy(a, other_a) if other_a ~= nil then -- Disengage from channels used by the other Alt and make it ready. altalldequeue(other_a.alt_array) other_a.alt_array.resolved = other_a.alt_index task_ready(other_a.alt_array.task) - elseif isto then + elseif isend then task_ready(a.alt_array.task) end end @@ -332,7 +336,7 @@ local function chanalt(alt_array, canblock) if #list_of_canexec_i > 0 then local i = random_choice(list_of_canexec_i) altexec(alt_array[i]) - return i, alt_array.value + return i, alt_array.value, alt_array.closed == nil end if canblock ~= true then @@ -357,7 +361,7 @@ local function chanalt(alt_array, canblock) assert(alt_array.resolved > 0) local r = alt_array.resolved - return r, alt_array.value + return r, alt_array.value, alt_array.closed == nil end @@ -381,7 +385,7 @@ local Channel = { local alts = {{c = self, op = RECV, to = to and os.time() + to or nil}} local s, msg = chanalt(alts, true) assert(s == 1) - return msg + return msg, alts[1].closed == nil end, nbsend = function(self, msg) @@ -394,6 +398,14 @@ local Channel = { return s == 1, msg end, + close = function(self) + local alts = self:_get_alts(RECV) + for _, v in ipairs(alts.l) do + v.closed = true + altexec(v) + end + end, + _get_alts = function(self, op) return (op == RECV) and self._recv_alts or self._send_alts end, @@ -674,6 +686,35 @@ local tests = { assert(done) end, + close_test = function() + local values = {} + for i = 1, 1000 do table.insert(values, i) end + local chan = task.Channel:new() + local done = task.Channel:new() + task.spawn(function() + local i = 0 + while true do + local msg, more = chan:recv() + if not more then + break + else + i = i + 1 + end + end + assert(i == 200) + done:send(1) + end) + + task.spawn(function() + for i =1, 200 do + chan:send(i) + end + chan:close() + end) + + task.scheduler() + end, + recv_timeout = function() print("\t testing... (this will block 8 seconds)") -- use copas to sleep @@ -721,7 +762,6 @@ local tests = { local r4 = function() local v = chan:recv(6) - print(v) assert(exists(values, v)) end