Skip to content

Commit aaa23f1

Browse files
committed
add unittest for partial order
1 parent 60ba84d commit aaa23f1

3 files changed

Lines changed: 186 additions & 21 deletions

File tree

jobq/jobq.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ class Finish(object):
2222
pass
2323

2424

25+
class NotMatch(object):
26+
pass
27+
28+
2529
class JobWorkerError(Exception):
2630
pass
2731

@@ -176,18 +180,18 @@ def _exec(self, input_q, output_q, thread_index):
176180
with self.probe['probe_lock']:
177181
self.probe['out'] += 1
178182

179-
if self.partial_order:
183+
# If rst is an iterator, it procures more than one args to next job.
184+
# In order to be accurate, we only count an iterator as one.
185+
186+
_put_rst(output_q, rst)
187+
188+
if self.partial_order and args is not None:
180189
k = str(args[0])
181190
with self.buffer_lock:
182191
if k in self.in_working:
183192
del self.in_working[k]
184193
self.buffer_queue.not_match.set()
185194

186-
# If rst is an iterator, it procures more than one args to next job.
187-
# In order to be accurate, we only count an iterator as one.
188-
189-
_put_rst(output_q, rst)
190-
191195
def _exec_in_order(self, input_q, output_q, thread_index):
192196

193197
while self.running:
@@ -270,16 +274,22 @@ def check_expired():
270274
args = self.buffer_queue.get(exclude=self.in_working)
271275
if args is Finish:
272276
return
273-
elif args is None:
277+
elif args is NotMatch:
274278
self.buffer_queue.not_match.wait()
279+
self.buffer_queue.not_match.clear()
275280
else:
276-
with self.buffer_lock:
277-
self.in_working[str(args[0])] = time.time()
281+
if args is not None:
282+
with self.buffer_lock:
283+
self.in_working[str(args[0])] = time.time()
284+
278285
_put_rst(self.input_queue, args)
279286

280287
class JobManager(object):
281288

282-
def __init__(self, workers, queue_size=1024, expire=3000, probe=None, keep_order=False, partial_order=False):
289+
def __init__(self, workers, queue_size=1024, expire=None, probe=None, keep_order=False, partial_order=False):
290+
291+
if expire is None:
292+
expire = 60 * 5
283293

284294
if probe is None:
285295
probe = {}
@@ -489,10 +499,8 @@ def get(self, block=True, timeout=None, exclude=None):
489499
raise Queue.Empty
490500
self.not_empty.wait(remaining)
491501
item = self._get(exclude)
492-
if item is not None:
502+
if item is not NotMatch:
493503
self.not_full.notify()
494-
else:
495-
self.not_match.clear()
496504
return item
497505

498506
def _qsize(self):
@@ -502,8 +510,10 @@ def _put(self, item):
502510
self.queue.append(item)
503511

504512
def _get(self, exclude):
505-
item = None
513+
item = NotMatch
506514
index = None
515+
selected = {}
516+
507517
for i, v in enumerate(self.queue):
508518
if v is Finish:
509519
if i == 0:
@@ -514,9 +524,13 @@ def _get(self, exclude):
514524
break
515525
else:
516526
key = str(v[0])
527+
if key in selected:
528+
continue
517529
if not exclude or key not in exclude:
518530
index, item = i, v
519531
break
532+
selected[key] = True
533+
520534
if index is not None:
521535
del self.queue[index]
522536
return item

jobq/t.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/sh
22

3-
python2 -m unittest -v discover
3+
python2 -m unittest discover -v
44

55
# python2 -m unittest -v test.test_jobq
66
# python2 -m unittest -v test.test_jobq.TestProbe

jobq/test/test_jobq.py

Lines changed: 157 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1+
#!/usr/bin/env python2
2+
# coding: utf-8
3+
14
import logging
25
import random
36
import threading
47
import time
58
import unittest
69

10+
711
from pykit import jobq
812
from pykit import threadutil
913
from pykit import ututil
@@ -36,9 +40,25 @@ def sleep_5(args):
3640
return args
3741

3842

43+
def is_partial_order(rst):
44+
cache = {}
45+
46+
for item in rst:
47+
k = item[0]
48+
if k not in cache:
49+
cache[k] = item
50+
51+
last = cache[k]
52+
if last[1] > item[1]:
53+
return False
54+
cache[k] = item
55+
56+
return True
57+
58+
3959
class TestProbe(unittest.TestCase):
4060

41-
def _start_jobq_in_thread(self, n_items, n_worker, keep_order=False):
61+
def _start_jobq_in_thread(self, items, n_worker, keep_order=False, partial_order=False):
4262

4363
def _sleep_1(args):
4464
time.sleep(0.1)
@@ -48,11 +68,12 @@ def _nothing(args):
4868
return args
4969

5070
probe = {}
51-
th = threading.Thread(target=lambda: jobq.run(range(n_items),
71+
th = threading.Thread(target=lambda: jobq.run(items,
5272
[(_sleep_1, n_worker),
5373
_nothing],
5474
probe=probe,
5575
keep_order=keep_order,
76+
partial_order=partial_order,
5677
))
5778
th.daemon = True
5879
th.start()
@@ -67,13 +88,12 @@ def test_probe_single_thread(self):
6788
(0.2, 0, 'all done'),
6889
)
6990

70-
th, probe = self._start_jobq_in_thread(3, 1)
91+
th, probe = self._start_jobq_in_thread(range(3), 1)
7192

7293
for sleep_time, doing, case_mes in cases:
7394

7495
time.sleep(sleep_time)
7596
stat = jobq.stat(probe)
76-
7797
self.assertEqual(doing, stat['doing'], case_mes)
7898

7999
# qsize() is not reliable. do not test the value of it.
@@ -100,7 +120,7 @@ def test_probe_3_thread(self):
100120
(0.4, 0, 'all done'),
101121
)
102122

103-
th, probe = self._start_jobq_in_thread(10, 3)
123+
th, probe = self._start_jobq_in_thread(range(10), 3)
104124

105125
for sleep_time, doing, case_mes in cases:
106126

@@ -124,7 +144,7 @@ def test_probe_3_thread_keep_order(self):
124144
(0.4, 0, 'all done'),
125145
)
126146

127-
th, probe = self._start_jobq_in_thread(10, 3, keep_order=True)
147+
th, probe = self._start_jobq_in_thread(range(10), 3, keep_order=True)
128148

129149
for sleep_time, doing, case_mes in cases:
130150

@@ -140,6 +160,28 @@ def test_probe_3_thread_keep_order(self):
140160

141161
th.join()
142162

163+
def test_probe_3_thread_partial_order(self):
164+
cases = (
165+
(0.05, 3, '_sleep_1 is working on 1st 3 items'),
166+
(0.1, 3, '_sleep_1 is working on 2nd 3 items'),
167+
(0.4, 0, 'all done'),
168+
)
169+
170+
th, probe = self._start_jobq_in_thread(([i, 0] for i in range(10)), 3, partial_order=True)
171+
172+
for sleep_time, doing, case_mes in cases:
173+
time.sleep(sleep_time)
174+
stat = jobq.stat(probe)
175+
176+
self.assertEqual(doing, stat['doing'], case_mes)
177+
178+
# use the last stat
179+
180+
workers = stat['workers']
181+
self.assertEqual(2, len(workers))
182+
183+
th.join()
184+
143185

144186
class TestDispatcher(unittest.TestCase):
145187

@@ -264,6 +306,37 @@ def collect(args):
264306
self.assertEqual([0], rst)
265307

266308

309+
class TestExpire(unittest.TestCase):
310+
311+
def test_expire(self):
312+
313+
def _sleep_1(args):
314+
sleep_got.append(args)
315+
time.sleep(0.5)
316+
return args
317+
318+
def collect(args):
319+
rst.append(args)
320+
321+
rst = []
322+
sleep_got = []
323+
324+
jm = jobq.JobManager([(_sleep_1, 3), collect], expire=0.2, partial_order=True)
325+
326+
n = 6
327+
for i in range(n):
328+
jm.put((i % 2, i))
329+
330+
time.sleep(0.1)
331+
self.assertEqual(set([(0, 0), (1, 1)]), set(sleep_got))
332+
333+
time.sleep(0.3)
334+
self.assertEqual(3, len(sleep_got))
335+
336+
jm.join()
337+
self.assertEqual(set([(i % 2, i) for i in range(n)]), set(rst))
338+
339+
267340
class TestJobManager(unittest.TestCase):
268341

269342
def test_manager(self):
@@ -434,6 +507,42 @@ def _change_thread_nr():
434507
for th in ths:
435508
th.join()
436509

510+
def test_set_thread_num_partial_order(self):
511+
512+
def _pass(args):
513+
return args
514+
515+
rst = []
516+
517+
jm = jobq.JobManager([_pass, rst.append], partial_order=True)
518+
519+
setter = {'running': True}
520+
521+
def _change_thread_nr():
522+
while setter['running']:
523+
jm.set_thread_num(_pass, random.randint(1, 4))
524+
time.sleep(0.5)
525+
526+
ths = []
527+
for ii in range(3):
528+
th = threadutil.start_daemon_thread(_change_thread_nr)
529+
ths.append(th)
530+
531+
n = 10240
532+
for i in range(n):
533+
jm.put([i % 2, i])
534+
535+
jm.join()
536+
537+
self.assertEqual(n, len(rst))
538+
539+
self.assertTrue(is_partial_order(rst))
540+
541+
setter['running'] = False
542+
543+
for th in ths:
544+
th.join()
545+
437546

438547
class TestJobQ(unittest.TestCase):
439548

@@ -506,6 +615,36 @@ def collect(args):
506615
jobq.run(inp, workers + [collect], keep_order=True)
507616
self.assertEqual(out, rst)
508617

618+
def add_list(args):
619+
args[1] += 1
620+
return args
621+
622+
def multi2_list(args):
623+
args[1] *= 2
624+
return args
625+
626+
def multi2_list_sleep(args):
627+
time.sleep(0.02)
628+
args[1] *= 2
629+
return args
630+
631+
cases = (
632+
(list([i % 2, i] for i in range(100)), [add_list, (multi2_list_sleep, 10)],
633+
list([i % 2, (i + 1) * 2] for i in range(100))
634+
),
635+
(list([i % 50, i] for i in range(1024 * 10)), [add_list, (multi2_list, 10)],
636+
list([i % 50, (i + 1) * 2] for i in range(1024 * 10))
637+
),
638+
)
639+
for inp, workers, out in cases:
640+
rst = []
641+
jobq.run(inp, workers + [collect], partial_order=True)
642+
self.assertTrue(is_partial_order(rst))
643+
644+
out.sort()
645+
rst.sort()
646+
self.assertEqual(out, rst)
647+
509648
def test_generator(self):
510649

511650
def gen(args):
@@ -514,6 +653,7 @@ def gen(args):
514653
time.sleep(0.1)
515654

516655
def collect(args):
656+
time.sleep(random.uniform(0.005, 0.02))
517657
rst.append(args)
518658

519659
rst = []
@@ -528,6 +668,17 @@ def collect(args):
528668

529669
self.assertEqual(9, len(rst), 'nr of elts')
530670

671+
def _gen(args):
672+
k = args[0]
673+
for i in range(3):
674+
yield (k, i)
675+
676+
rst = []
677+
jobq.run([(k, 0) for k in range(3)], [(_gen, 3), (collect, 3)], partial_order=True)
678+
self.assertEqual(set([(k, v) for k in range(3) for v in range(3)]), set(rst),
679+
"generator should get all")
680+
self.assertTrue(is_partial_order(rst))
681+
531682

532683
class TestDefaultTimeout(unittest.TestCase):
533684

0 commit comments

Comments
 (0)