##// END OF EJS Templates
Fix parallel test suite
Thomas Kluyver -
Show More
@@ -271,5 +271,5 b' class BaseParallelApplication(BaseIPythonApplication):'
271 271 " Making the likely assumption that it is."%pid
272 272 )
273 273 return True
274 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
274 pids = list(map(int, re.findall(r'^\W*\d+', output, re.MULTILINE)))
275 275 return pid in pids
@@ -80,7 +80,7 b' class AsyncResult(object):'
80 80 self._ready = False
81 81 self._outputs_ready = False
82 82 self._success = None
83 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
83 self._metadata = [self._client.metadata[id] for id in self.msg_ids]
84 84
85 85 def __repr__(self):
86 86 if self._ready:
@@ -143,7 +143,7 b' class AsyncResult(object):'
143 143 self._ready = self._client.wait(self.msg_ids, timeout)
144 144 if self._ready:
145 145 try:
146 results = map(self._client.results.get, self.msg_ids)
146 results = list(map(self._client.results.get, self.msg_ids))
147 147 self._result = results
148 148 if self._single_result:
149 149 r = results[0]
@@ -669,10 +669,10 b' class AsyncHubResult(AsyncResult):'
669 669 start = time.time()
670 670 if self._ready:
671 671 return
672 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
672 local_ids = [m for m in self.msg_ids if m in self._client.outstanding]
673 673 local_ready = self._client.wait(local_ids, timeout)
674 674 if local_ready:
675 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
675 remote_ids = [m for m in self.msg_ids if m not in self._client.results]
676 676 if not remote_ids:
677 677 self._ready = True
678 678 else:
@@ -687,7 +687,7 b' class AsyncHubResult(AsyncResult):'
687 687 self._ready = True
688 688 if self._ready:
689 689 try:
690 results = map(self._client.results.get, self.msg_ids)
690 results = list(map(self._client.results.get, self.msg_ids))
691 691 self._result = results
692 692 if self._single_result:
693 693 r = results[0]
@@ -702,6 +702,6 b' class AsyncHubResult(AsyncResult):'
702 702 else:
703 703 self._success = True
704 704 finally:
705 self._metadata = map(self._client.metadata.get, self.msg_ids)
705 self._metadata = [self._client.metadata[mid] for mid in self.msg_ids]
706 706
707 707 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
@@ -540,7 +540,7 b' class Client(HasTraits):'
540 540 self._ids.append(eid)
541 541 self._engines[eid] = v
542 542 self._ids = sorted(self._ids)
543 if sorted(self._engines.keys()) != range(len(self._engines)) and \
543 if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
544 544 self._task_scheme == 'pure' and self._task_socket:
545 545 self._stop_scheduling_tasks()
546 546
@@ -581,7 +581,7 b' class Client(HasTraits):'
581 581 targets = [targets]
582 582
583 583 if isinstance(targets, slice):
584 indices = range(len(self._ids))[targets]
584 indices = list(range(len(self._ids))[targets])
585 585 ids = self.ids
586 586 targets = [ ids[i] for i in indices ]
587 587
@@ -1070,7 +1070,7 b' class Client(HasTraits):'
1070 1070 # index access
1071 1071 job = self.history[job]
1072 1072 elif isinstance(job, AsyncResult):
1073 map(theids.add, job.msg_ids)
1073 theids.update(job.msg_ids)
1074 1074 continue
1075 1075 theids.add(job)
1076 1076 if not theids.intersection(self.outstanding):
@@ -1132,7 +1132,7 b' class Client(HasTraits):'
1132 1132 msg_ids = []
1133 1133 if isinstance(jobs, string_types + (AsyncResult,)):
1134 1134 jobs = [jobs]
1135 bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult,)), jobs)
1135 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1136 1136 if bad_ids:
1137 1137 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1138 1138 for j in jobs:
@@ -1415,8 +1415,8 b' class Client(HasTraits):'
1415 1415 raise TypeError("indices must be str or int, not %r"%id)
1416 1416 theids.append(id)
1417 1417
1418 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1419 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1418 local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
1419 remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
1420 1420
1421 1421 # given single msg_id initially, get_result shot get the result itself,
1422 1422 # not a length-one list
@@ -1641,7 +1641,7 b' class Client(HasTraits):'
1641 1641 if not targets: # needed as _build_targets otherwise uses all engines
1642 1642 return []
1643 1643 target_ids = self._build_targets(targets)[0]
1644 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1644 return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
1645 1645
1646 1646 def _build_msgids_from_jobs(self, jobs=None):
1647 1647 """Build a list of msg_ids from "jobs" """
@@ -1650,7 +1650,7 b' class Client(HasTraits):'
1650 1650 msg_ids = []
1651 1651 if isinstance(jobs, string_types + (AsyncResult,)):
1652 1652 jobs = [jobs]
1653 bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult)), jobs)
1653 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1654 1654 if bad_ids:
1655 1655 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1656 1656 for j in jobs:
@@ -1701,8 +1701,9 b' class Client(HasTraits):'
1701 1701 msg_ids = []
1702 1702 msg_ids.extend(self._build_msgids_from_target(targets))
1703 1703 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1704 map(self.results.pop, msg_ids)
1705 map(self.metadata.pop, msg_ids)
1704 for mid in msg_ids:
1705 self.results.pop(mid)
1706 self.metadata.pop(mid)
1706 1707
1707 1708
1708 1709 @spin_first
@@ -32,7 +32,7 b' from IPython.external.decorator import decorator'
32 32
33 33 from IPython.parallel import util
34 34 from IPython.parallel.controller.dependency import Dependency, dependent
35 from IPython.utils.py3compat import string_types, iteritems
35 from IPython.utils.py3compat import string_types, iteritems, PY3
36 36
37 37 from . import map as Map
38 38 from .asyncresult import AsyncResult, AsyncMapResult
@@ -52,7 +52,7 b' def save_ids(f, self, *args, **kwargs):'
52 52 nmsgs = len(self.client.history) - n_previous
53 53 msg_ids = self.client.history[-nmsgs:]
54 54 self.history.extend(msg_ids)
55 map(self.outstanding.add, msg_ids)
55 self.outstanding.update(msg_ids)
56 56 return ret
57 57
58 58 @decorator
@@ -952,8 +952,9 b' class LoadBalancedView(View):'
952 952 raise ValueError("Invalid dependency: %r"%value)
953 953 if 'timeout' in kwargs:
954 954 t = kwargs['timeout']
955 if not isinstance(t, (int, long, float, type(None))):
956 raise TypeError("Invalid type for timeout: %r"%type(t))
955 if not isinstance(t, (int, float, type(None))):
956 if (not PY3) and (not isinstance(t, long)):
957 raise TypeError("Invalid type for timeout: %r"%type(t))
957 958 if t is not None:
958 959 if t < 0:
959 960 raise ValueError("Invalid timeout: %s"%t)
@@ -56,7 +56,11 b' class dependent(object):'
56 56
57 57 def __init__(self, f, df, *dargs, **dkwargs):
58 58 self.f = f
59 self.__name__ = getattr(f, '__name__', 'f')
59 name = getattr(f, '__name__', 'f')
60 if py3compat.PY3:
61 self.__name__ = name
62 else:
63 self.func_name = name
60 64 self.df = df
61 65 self.dargs = dargs
62 66 self.dkwargs = dkwargs
@@ -125,10 +125,12 b' class HeartMonitor(LoggingConfigurable):'
125 125 goodhearts = self.hearts.intersection(self.responses)
126 126 missed_beats = self.hearts.difference(goodhearts)
127 127 newhearts = self.responses.difference(goodhearts)
128 map(self.handle_new_heart, newhearts)
128 for heart in newhearts:
129 self.handle_new_heart(heart)
129 130 heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
130 131 self.hearts)
131 map(self.handle_heart_failure, heartfailures)
132 for failure in heartfailures:
133 self.handle_heart_failure(failure)
132 134 self.on_probation = on_probation
133 135 self.responses = set()
134 136 #print self.on_probation, self.hearts
@@ -193,9 +193,9 b' class HubFactory(RegistrationFactory):'
193 193 help="IP on which to listen for monitor messages. [default: loopback]")
194 194 monitor_transport = Unicode('tcp', config=True,
195 195 help="0MQ transport for monitor messages. [default : tcp]")
196
196
197 197 _client_ip_default = _monitor_ip_default = _engine_ip_default
198
198
199 199
200 200 monitor_url = Unicode('')
201 201
@@ -1190,7 +1190,7 b' class Hub(SessionFactory):'
1190 1190 except Exception:
1191 1191 reply = error.wrap_exception()
1192 1192 else:
1193 pending = filter(lambda m: m in self.pending, msg_ids)
1193 pending = [m for m in msg_ids if (m in self.pending)]
1194 1194 if pending:
1195 1195 try:
1196 1196 raise IndexError("msg pending: %r" % pending[0])
@@ -1312,7 +1312,7 b' class Hub(SessionFactory):'
1312 1312 'io' : io_dict,
1313 1313 }
1314 1314 if rec['result_buffers']:
1315 buffers = map(bytes, rec['result_buffers'])
1315 buffers = list(map(bytes, rec['result_buffers']))
1316 1316 else:
1317 1317 buffers = []
1318 1318
@@ -62,7 +62,7 b' class MongoDB(BaseDB):'
62 62 def _binary_buffers(self, rec):
63 63 for key in ('buffers', 'result_buffers'):
64 64 if rec.get(key, None):
65 rec[key] = map(Binary, rec[key])
65 rec[key] = list(map(Binary, rec[key]))
66 66 return rec
67 67
68 68 def add_record(self, msg_id, rec):
@@ -364,7 +364,7 b' class TaskScheduler(SessionFactory):'
364 364 date=datetime.now(),
365 365 )
366 366 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
367 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
367 raw_reply = list(map(zmq.Message, self.session.serialize(msg, ident=idents)))
368 368 # and dispatch it
369 369 self.dispatch_result(raw_reply)
370 370
@@ -402,8 +402,7 b' class TaskScheduler(SessionFactory):'
402 402 # get targets as a set of bytes objects
403 403 # from a list of unicode objects
404 404 targets = md.get('targets', [])
405 targets = map(cast_bytes, targets)
406 targets = set(targets)
405 targets = set(map(cast_bytes, targets))
407 406
408 407 retries = md.get('retries', 0)
409 408 self.retries[msg_id] = retries
@@ -515,7 +514,7 b' class TaskScheduler(SessionFactory):'
515 514 def available_engines(self):
516 515 """return a list of available engine indices based on HWM"""
517 516 if not self.hwm:
518 return range(len(self.targets))
517 return list(range(len(self.targets)))
519 518 available = []
520 519 for idx in range(len(self.targets)):
521 520 if self.loads[idx] < self.hwm:
@@ -547,7 +546,7 b' class TaskScheduler(SessionFactory):'
547 546 # check follow
548 547 return job.follow.check(self.completed[target], self.failed[target])
549 548
550 indices = filter(can_run, available)
549 indices = list(filter(can_run, available))
551 550
552 551 if not indices:
553 552 # couldn't run
@@ -76,7 +76,7 b' def _adapt_bufs(bufs):'
76 76 # this is *horrible*
77 77 # copy buffers into single list and pickle it:
78 78 if bufs and isinstance(bufs[0], (bytes, buffer)):
79 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
79 return sqlite3.Binary(pickle.dumps(list(map(bytes, bufs)),-1))
80 80 elif bufs:
81 81 return bufs
82 82 else:
@@ -83,7 +83,7 b' class AsyncResultTest(ClusterTestCase):'
83 83
84 84 def test_get_dict_single(self):
85 85 view = self.client[-1]
86 for v in (range(5), 5, ('abc', 'def'), 'string'):
86 for v in (list(range(5)), 5, ('abc', 'def'), 'string'):
87 87 ar = view.apply_async(echo, v)
88 88 self.assertEqual(ar.get(), v)
89 89 d = ar.get_dict()
@@ -146,11 +146,11 b' class AsyncResultTest(ClusterTestCase):'
146 146
147 147 def test_len(self):
148 148 v = self.client.load_balanced_view()
149 ar = v.map_async(lambda x: x, range(10))
149 ar = v.map_async(lambda x: x, list(range(10)))
150 150 self.assertEqual(len(ar), 10)
151 ar = v.apply_async(lambda x: x, range(10))
151 ar = v.apply_async(lambda x: x, list(range(10)))
152 152 self.assertEqual(len(ar), 1)
153 ar = self.client[:].apply_async(lambda x: x, range(10))
153 ar = self.client[:].apply_async(lambda x: x, list(range(10)))
154 154 self.assertEqual(len(ar), len(self.client.ids))
155 155
156 156 def test_wall_time_single(self):
@@ -95,7 +95,7 b' class TestClient(ClusterTestCase):'
95 95
96 96 def double(x):
97 97 return x*2
98 seq = range(100)
98 seq = list(range(100))
99 99 ref = [ double(x) for x in seq ]
100 100
101 101 # add some engines, which should be used
@@ -41,9 +41,9 b' def wait(n):'
41 41 def func(x):
42 42 return x*x
43 43
44 mixed = map(str, range(10))
45 completed = map(str, range(0,10,2))
46 failed = map(str, range(1,10,2))
44 mixed = list(map(str, range(10)))
45 completed = list(map(str, range(0,10,2)))
46 failed = list(map(str, range(1,10,2)))
47 47
48 48 class DependencyTest(ClusterTestCase):
49 49
@@ -74,12 +74,12 b' class DependencyTest(ClusterTestCase):'
74 74 def test_require_imports(self):
75 75 """test that @require imports names"""
76 76 @self.cancan
77 @pmod.require('urllib')
77 @pmod.require('base64')
78 78 @interactive
79 def encode(dikt):
80 return urllib.urlencode(dikt)
79 def encode(arg):
80 return base64.b64encode(arg)
81 81 # must pass through canning to properly connect namespaces
82 self.assertEqual(encode(dict(a=5)), 'a=5')
82 self.assertEqual(encode(b'foo'), b'Zm9v')
83 83
84 84 def test_success_only(self):
85 85 dep = pmod.Dependency(mixed, success=True, failure=False)
@@ -55,17 +55,17 b' class TestLoadBalancedView(ClusterTestCase):'
55 55 def test_map(self):
56 56 def f(x):
57 57 return x**2
58 data = range(16)
58 data = list(range(16))
59 59 r = self.view.map_sync(f, data)
60 self.assertEqual(r, map(f, data))
60 self.assertEqual(r, list(map(f, data)))
61 61
62 62 def test_map_generator(self):
63 63 def f(x):
64 64 return x**2
65 65
66 data = range(16)
66 data = list(range(16))
67 67 r = self.view.map_sync(f, iter(data))
68 self.assertEqual(r, map(f, iter(data)))
68 self.assertEqual(r, list(map(f, iter(data))))
69 69
70 70 def test_map_short_first(self):
71 71 def f(x,y):
@@ -74,11 +74,11 b' class TestLoadBalancedView(ClusterTestCase):'
74 74 if x is None:
75 75 return x
76 76 return x*y
77 data = range(10)
78 data2 = range(4)
77 data = list(range(10))
78 data2 = list(range(4))
79 79
80 80 r = self.view.map_sync(f, data, data2)
81 self.assertEqual(r, map(f, data, data2))
81 self.assertEqual(r, list(map(f, data, data2)))
82 82
83 83 def test_map_short_last(self):
84 84 def f(x,y):
@@ -87,11 +87,11 b' class TestLoadBalancedView(ClusterTestCase):'
87 87 if x is None:
88 88 return x
89 89 return x*y
90 data = range(4)
91 data2 = range(10)
90 data = list(range(4))
91 data2 = list(range(10))
92 92
93 93 r = self.view.map_sync(f, data, data2)
94 self.assertEqual(r, map(f, data, data2))
94 self.assertEqual(r, list(map(f, data, data2)))
95 95
96 96 def test_map_unordered(self):
97 97 def f(x):
@@ -100,8 +100,8 b' class TestLoadBalancedView(ClusterTestCase):'
100 100 import time
101 101 time.sleep(0.05*x)
102 102 return x**2
103 data = range(16,0,-1)
104 reference = map(f, data)
103 data = list(range(16,0,-1))
104 reference = list(map(f, data))
105 105
106 106 amr = self.view.map_async(slow_f, data, ordered=False)
107 107 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
@@ -119,8 +119,8 b' class TestLoadBalancedView(ClusterTestCase):'
119 119 import time
120 120 time.sleep(0.05*x)
121 121 return x**2
122 data = range(16,0,-1)
123 reference = map(f, data)
122 data = list(range(16,0,-1))
123 reference = list(map(f, data))
124 124
125 125 amr = self.view.map_async(slow_f, data)
126 126 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
@@ -69,7 +69,7 b' class TestView(ClusterTestCase):'
69 69
70 70 def test_push_pull(self):
71 71 """test pushing and pulling"""
72 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
72 data = dict(a=10, b=1.05, c=list(range(10)), d={'e':(1,2),'f':'hi'})
73 73 t = self.client.ids[-1]
74 74 v = self.client[t]
75 75 push = v.push
@@ -230,7 +230,7 b' class TestView(ClusterTestCase):'
230 230
231 231 def test_scatter_gather(self):
232 232 view = self.client[:]
233 seq1 = range(16)
233 seq1 = list(range(16))
234 234 view.scatter('a', seq1)
235 235 seq2 = view.gather('a', block=True)
236 236 self.assertEqual(seq2, seq1)
@@ -249,7 +249,7 b' class TestView(ClusterTestCase):'
249 249 def test_scatter_gather_lazy(self):
250 250 """scatter/gather with targets='all'"""
251 251 view = self.client.direct_view(targets='all')
252 x = range(64)
252 x = list(range(64))
253 253 view.scatter('x', x)
254 254 gathered = view.gather('x', block=True)
255 255 self.assertEqual(gathered, x)
@@ -315,7 +315,7 b' class TestView(ClusterTestCase):'
315 315 """push/pull pandas.TimeSeries"""
316 316 import pandas
317 317
318 ts = pandas.TimeSeries(range(10))
318 ts = pandas.TimeSeries(list(range(10)))
319 319
320 320 view = self.client[-1]
321 321
@@ -329,9 +329,9 b' class TestView(ClusterTestCase):'
329 329 view = self.client[:]
330 330 def f(x):
331 331 return x**2
332 data = range(16)
332 data = list(range(16))
333 333 r = view.map_sync(f, data)
334 self.assertEqual(r, map(f, data))
334 self.assertEqual(r, list(map(f, data)))
335 335
336 336 def test_map_iterable(self):
337 337 """test map on iterables (direct)"""
@@ -356,7 +356,7 b' class TestView(ClusterTestCase):'
356 356 assert_array_equal(r, arr)
357 357
358 358 def test_scatter_gather_nonblocking(self):
359 data = range(16)
359 data = list(range(16))
360 360 view = self.client[:]
361 361 view.scatter('a', data, block=False)
362 362 ar = view.gather('a', block=False)
@@ -451,7 +451,7 b' class TestView(ClusterTestCase):'
451 451
452 452 @interactive
453 453 def check_unicode(a, check):
454 assert isinstance(a, unicode_type), "%r is not unicode"%a
454 assert not isinstance(a, bytes), "%r is bytes, not unicode"%a
455 455 assert isinstance(check, bytes), "%r is not bytes"%check
456 456 assert a.encode('utf8') == check, "%s != %s"%(a,check)
457 457
@@ -488,7 +488,7 b' class TestView(ClusterTestCase):'
488 488
489 489 def test_eval_reference(self):
490 490 v = self.client[self.client.ids[0]]
491 v['g'] = range(5)
491 v['g'] = list(range(5))
492 492 rg = pmod.Reference('g[0]')
493 493 echo = lambda x:x
494 494 self.assertEqual(v.apply_sync(echo, rg), 0)
@@ -501,7 +501,7 b' class TestView(ClusterTestCase):'
501 501
502 502 def test_single_engine_map(self):
503 503 e0 = self.client[self.client.ids[0]]
504 r = range(5)
504 r = list(range(5))
505 505 check = [ -1*i for i in r ]
506 506 result = e0.map_sync(lambda x: -1*x, r)
507 507 self.assertEqual(result, check)
@@ -231,7 +231,7 b' def _push(**ns):'
231 231 while tmp in user_ns:
232 232 tmp = tmp + '_'
233 233 try:
234 for name, value in iteritems(ns):
234 for name, value in ns.items():
235 235 user_ns[tmp] = value
236 236 exec("%s = %s" % (name, tmp), user_ns)
237 237 finally:
@@ -241,7 +241,7 b' def _push(**ns):'
241 241 def _pull(keys):
242 242 """helper method for implementing `client.pull` via `client.apply`"""
243 243 if isinstance(keys, (list,tuple, set)):
244 return map(lambda key: eval(key, globals()), keys)
244 return [eval(key, globals()) for key in keys]
245 245 else:
246 246 return eval(keys, globals())
247 247
General Comments 0
You need to be logged in to leave comments. Login now