##// END OF EJS Templates
Fix parallel test suite
Thomas Kluyver -
Show More
@@ -271,5 +271,5 b' class BaseParallelApplication(BaseIPythonApplication):'
271 " Making the likely assumption that it is."%pid
271 " Making the likely assumption that it is."%pid
272 )
272 )
273 return True
273 return True
274 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
274 pids = list(map(int, re.findall(r'^\W*\d+', output, re.MULTILINE)))
275 return pid in pids
275 return pid in pids
@@ -80,7 +80,7 b' class AsyncResult(object):'
80 self._ready = False
80 self._ready = False
81 self._outputs_ready = False
81 self._outputs_ready = False
82 self._success = None
82 self._success = None
83 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
83 self._metadata = [self._client.metadata[id] for id in self.msg_ids]
84
84
85 def __repr__(self):
85 def __repr__(self):
86 if self._ready:
86 if self._ready:
@@ -143,7 +143,7 b' class AsyncResult(object):'
143 self._ready = self._client.wait(self.msg_ids, timeout)
143 self._ready = self._client.wait(self.msg_ids, timeout)
144 if self._ready:
144 if self._ready:
145 try:
145 try:
146 results = map(self._client.results.get, self.msg_ids)
146 results = list(map(self._client.results.get, self.msg_ids))
147 self._result = results
147 self._result = results
148 if self._single_result:
148 if self._single_result:
149 r = results[0]
149 r = results[0]
@@ -669,10 +669,10 b' class AsyncHubResult(AsyncResult):'
669 start = time.time()
669 start = time.time()
670 if self._ready:
670 if self._ready:
671 return
671 return
672 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
672 local_ids = [m for m in self.msg_ids if m in self._client.outstanding]
673 local_ready = self._client.wait(local_ids, timeout)
673 local_ready = self._client.wait(local_ids, timeout)
674 if local_ready:
674 if local_ready:
675 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
675 remote_ids = [m for m in self.msg_ids if m not in self._client.results]
676 if not remote_ids:
676 if not remote_ids:
677 self._ready = True
677 self._ready = True
678 else:
678 else:
@@ -687,7 +687,7 b' class AsyncHubResult(AsyncResult):'
687 self._ready = True
687 self._ready = True
688 if self._ready:
688 if self._ready:
689 try:
689 try:
690 results = map(self._client.results.get, self.msg_ids)
690 results = list(map(self._client.results.get, self.msg_ids))
691 self._result = results
691 self._result = results
692 if self._single_result:
692 if self._single_result:
693 r = results[0]
693 r = results[0]
@@ -702,6 +702,6 b' class AsyncHubResult(AsyncResult):'
702 else:
702 else:
703 self._success = True
703 self._success = True
704 finally:
704 finally:
705 self._metadata = map(self._client.metadata.get, self.msg_ids)
705 self._metadata = [self._client.metadata[mid] for mid in self.msg_ids]
706
706
707 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
707 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
@@ -540,7 +540,7 b' class Client(HasTraits):'
540 self._ids.append(eid)
540 self._ids.append(eid)
541 self._engines[eid] = v
541 self._engines[eid] = v
542 self._ids = sorted(self._ids)
542 self._ids = sorted(self._ids)
543 if sorted(self._engines.keys()) != range(len(self._engines)) and \
543 if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
544 self._task_scheme == 'pure' and self._task_socket:
544 self._task_scheme == 'pure' and self._task_socket:
545 self._stop_scheduling_tasks()
545 self._stop_scheduling_tasks()
546
546
@@ -581,7 +581,7 b' class Client(HasTraits):'
581 targets = [targets]
581 targets = [targets]
582
582
583 if isinstance(targets, slice):
583 if isinstance(targets, slice):
584 indices = range(len(self._ids))[targets]
584 indices = list(range(len(self._ids))[targets])
585 ids = self.ids
585 ids = self.ids
586 targets = [ ids[i] for i in indices ]
586 targets = [ ids[i] for i in indices ]
587
587
@@ -1070,7 +1070,7 b' class Client(HasTraits):'
1070 # index access
1070 # index access
1071 job = self.history[job]
1071 job = self.history[job]
1072 elif isinstance(job, AsyncResult):
1072 elif isinstance(job, AsyncResult):
1073 map(theids.add, job.msg_ids)
1073 theids.update(job.msg_ids)
1074 continue
1074 continue
1075 theids.add(job)
1075 theids.add(job)
1076 if not theids.intersection(self.outstanding):
1076 if not theids.intersection(self.outstanding):
@@ -1132,7 +1132,7 b' class Client(HasTraits):'
1132 msg_ids = []
1132 msg_ids = []
1133 if isinstance(jobs, string_types + (AsyncResult,)):
1133 if isinstance(jobs, string_types + (AsyncResult,)):
1134 jobs = [jobs]
1134 jobs = [jobs]
1135 bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult,)), jobs)
1135 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1136 if bad_ids:
1136 if bad_ids:
1137 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1137 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1138 for j in jobs:
1138 for j in jobs:
@@ -1415,8 +1415,8 b' class Client(HasTraits):'
1415 raise TypeError("indices must be str or int, not %r"%id)
1415 raise TypeError("indices must be str or int, not %r"%id)
1416 theids.append(id)
1416 theids.append(id)
1417
1417
1418 local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
1418 local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
1419 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1419 remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
1420
1420
1421 # given single msg_id initially, get_result shot get the result itself,
1421 # given single msg_id initially, get_result shot get the result itself,
1422 # not a length-one list
1422 # not a length-one list
@@ -1641,7 +1641,7 b' class Client(HasTraits):'
1641 if not targets: # needed as _build_targets otherwise uses all engines
1641 if not targets: # needed as _build_targets otherwise uses all engines
1642 return []
1642 return []
1643 target_ids = self._build_targets(targets)[0]
1643 target_ids = self._build_targets(targets)[0]
1644 return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
1644 return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
1645
1645
1646 def _build_msgids_from_jobs(self, jobs=None):
1646 def _build_msgids_from_jobs(self, jobs=None):
1647 """Build a list of msg_ids from "jobs" """
1647 """Build a list of msg_ids from "jobs" """
@@ -1650,7 +1650,7 b' class Client(HasTraits):'
1650 msg_ids = []
1650 msg_ids = []
1651 if isinstance(jobs, string_types + (AsyncResult,)):
1651 if isinstance(jobs, string_types + (AsyncResult,)):
1652 jobs = [jobs]
1652 jobs = [jobs]
1653 bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult)), jobs)
1653 bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
1654 if bad_ids:
1654 if bad_ids:
1655 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1655 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1656 for j in jobs:
1656 for j in jobs:
@@ -1701,8 +1701,9 b' class Client(HasTraits):'
1701 msg_ids = []
1701 msg_ids = []
1702 msg_ids.extend(self._build_msgids_from_target(targets))
1702 msg_ids.extend(self._build_msgids_from_target(targets))
1703 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1703 msg_ids.extend(self._build_msgids_from_jobs(jobs))
1704 map(self.results.pop, msg_ids)
1704 for mid in msg_ids:
1705 map(self.metadata.pop, msg_ids)
1705 self.results.pop(mid)
1706 self.metadata.pop(mid)
1706
1707
1707
1708
1708 @spin_first
1709 @spin_first
@@ -32,7 +32,7 b' from IPython.external.decorator import decorator'
32
32
33 from IPython.parallel import util
33 from IPython.parallel import util
34 from IPython.parallel.controller.dependency import Dependency, dependent
34 from IPython.parallel.controller.dependency import Dependency, dependent
35 from IPython.utils.py3compat import string_types, iteritems
35 from IPython.utils.py3compat import string_types, iteritems, PY3
36
36
37 from . import map as Map
37 from . import map as Map
38 from .asyncresult import AsyncResult, AsyncMapResult
38 from .asyncresult import AsyncResult, AsyncMapResult
@@ -52,7 +52,7 b' def save_ids(f, self, *args, **kwargs):'
52 nmsgs = len(self.client.history) - n_previous
52 nmsgs = len(self.client.history) - n_previous
53 msg_ids = self.client.history[-nmsgs:]
53 msg_ids = self.client.history[-nmsgs:]
54 self.history.extend(msg_ids)
54 self.history.extend(msg_ids)
55 map(self.outstanding.add, msg_ids)
55 self.outstanding.update(msg_ids)
56 return ret
56 return ret
57
57
58 @decorator
58 @decorator
@@ -952,8 +952,9 b' class LoadBalancedView(View):'
952 raise ValueError("Invalid dependency: %r"%value)
952 raise ValueError("Invalid dependency: %r"%value)
953 if 'timeout' in kwargs:
953 if 'timeout' in kwargs:
954 t = kwargs['timeout']
954 t = kwargs['timeout']
955 if not isinstance(t, (int, long, float, type(None))):
955 if not isinstance(t, (int, float, type(None))):
956 raise TypeError("Invalid type for timeout: %r"%type(t))
956 if (not PY3) and (not isinstance(t, long)):
957 raise TypeError("Invalid type for timeout: %r"%type(t))
957 if t is not None:
958 if t is not None:
958 if t < 0:
959 if t < 0:
959 raise ValueError("Invalid timeout: %s"%t)
960 raise ValueError("Invalid timeout: %s"%t)
@@ -56,7 +56,11 b' class dependent(object):'
56
56
57 def __init__(self, f, df, *dargs, **dkwargs):
57 def __init__(self, f, df, *dargs, **dkwargs):
58 self.f = f
58 self.f = f
59 self.__name__ = getattr(f, '__name__', 'f')
59 name = getattr(f, '__name__', 'f')
60 if py3compat.PY3:
61 self.__name__ = name
62 else:
63 self.func_name = name
60 self.df = df
64 self.df = df
61 self.dargs = dargs
65 self.dargs = dargs
62 self.dkwargs = dkwargs
66 self.dkwargs = dkwargs
@@ -125,10 +125,12 b' class HeartMonitor(LoggingConfigurable):'
125 goodhearts = self.hearts.intersection(self.responses)
125 goodhearts = self.hearts.intersection(self.responses)
126 missed_beats = self.hearts.difference(goodhearts)
126 missed_beats = self.hearts.difference(goodhearts)
127 newhearts = self.responses.difference(goodhearts)
127 newhearts = self.responses.difference(goodhearts)
128 map(self.handle_new_heart, newhearts)
128 for heart in newhearts:
129 self.handle_new_heart(heart)
129 heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
130 heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
130 self.hearts)
131 self.hearts)
131 map(self.handle_heart_failure, heartfailures)
132 for failure in heartfailures:
133 self.handle_heart_failure(failure)
132 self.on_probation = on_probation
134 self.on_probation = on_probation
133 self.responses = set()
135 self.responses = set()
134 #print self.on_probation, self.hearts
136 #print self.on_probation, self.hearts
@@ -193,9 +193,9 b' class HubFactory(RegistrationFactory):'
193 help="IP on which to listen for monitor messages. [default: loopback]")
193 help="IP on which to listen for monitor messages. [default: loopback]")
194 monitor_transport = Unicode('tcp', config=True,
194 monitor_transport = Unicode('tcp', config=True,
195 help="0MQ transport for monitor messages. [default : tcp]")
195 help="0MQ transport for monitor messages. [default : tcp]")
196
196
197 _client_ip_default = _monitor_ip_default = _engine_ip_default
197 _client_ip_default = _monitor_ip_default = _engine_ip_default
198
198
199
199
200 monitor_url = Unicode('')
200 monitor_url = Unicode('')
201
201
@@ -1190,7 +1190,7 b' class Hub(SessionFactory):'
1190 except Exception:
1190 except Exception:
1191 reply = error.wrap_exception()
1191 reply = error.wrap_exception()
1192 else:
1192 else:
1193 pending = filter(lambda m: m in self.pending, msg_ids)
1193 pending = [m for m in msg_ids if (m in self.pending)]
1194 if pending:
1194 if pending:
1195 try:
1195 try:
1196 raise IndexError("msg pending: %r" % pending[0])
1196 raise IndexError("msg pending: %r" % pending[0])
@@ -1312,7 +1312,7 b' class Hub(SessionFactory):'
1312 'io' : io_dict,
1312 'io' : io_dict,
1313 }
1313 }
1314 if rec['result_buffers']:
1314 if rec['result_buffers']:
1315 buffers = map(bytes, rec['result_buffers'])
1315 buffers = list(map(bytes, rec['result_buffers']))
1316 else:
1316 else:
1317 buffers = []
1317 buffers = []
1318
1318
@@ -62,7 +62,7 b' class MongoDB(BaseDB):'
62 def _binary_buffers(self, rec):
62 def _binary_buffers(self, rec):
63 for key in ('buffers', 'result_buffers'):
63 for key in ('buffers', 'result_buffers'):
64 if rec.get(key, None):
64 if rec.get(key, None):
65 rec[key] = map(Binary, rec[key])
65 rec[key] = list(map(Binary, rec[key]))
66 return rec
66 return rec
67
67
68 def add_record(self, msg_id, rec):
68 def add_record(self, msg_id, rec):
@@ -364,7 +364,7 b' class TaskScheduler(SessionFactory):'
364 date=datetime.now(),
364 date=datetime.now(),
365 )
365 )
366 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
366 msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
367 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
367 raw_reply = list(map(zmq.Message, self.session.serialize(msg, ident=idents)))
368 # and dispatch it
368 # and dispatch it
369 self.dispatch_result(raw_reply)
369 self.dispatch_result(raw_reply)
370
370
@@ -402,8 +402,7 b' class TaskScheduler(SessionFactory):'
402 # get targets as a set of bytes objects
402 # get targets as a set of bytes objects
403 # from a list of unicode objects
403 # from a list of unicode objects
404 targets = md.get('targets', [])
404 targets = md.get('targets', [])
405 targets = map(cast_bytes, targets)
405 targets = set(map(cast_bytes, targets))
406 targets = set(targets)
407
406
408 retries = md.get('retries', 0)
407 retries = md.get('retries', 0)
409 self.retries[msg_id] = retries
408 self.retries[msg_id] = retries
@@ -515,7 +514,7 b' class TaskScheduler(SessionFactory):'
515 def available_engines(self):
514 def available_engines(self):
516 """return a list of available engine indices based on HWM"""
515 """return a list of available engine indices based on HWM"""
517 if not self.hwm:
516 if not self.hwm:
518 return range(len(self.targets))
517 return list(range(len(self.targets)))
519 available = []
518 available = []
520 for idx in range(len(self.targets)):
519 for idx in range(len(self.targets)):
521 if self.loads[idx] < self.hwm:
520 if self.loads[idx] < self.hwm:
@@ -547,7 +546,7 b' class TaskScheduler(SessionFactory):'
547 # check follow
546 # check follow
548 return job.follow.check(self.completed[target], self.failed[target])
547 return job.follow.check(self.completed[target], self.failed[target])
549
548
550 indices = filter(can_run, available)
549 indices = list(filter(can_run, available))
551
550
552 if not indices:
551 if not indices:
553 # couldn't run
552 # couldn't run
@@ -76,7 +76,7 b' def _adapt_bufs(bufs):'
76 # this is *horrible*
76 # this is *horrible*
77 # copy buffers into single list and pickle it:
77 # copy buffers into single list and pickle it:
78 if bufs and isinstance(bufs[0], (bytes, buffer)):
78 if bufs and isinstance(bufs[0], (bytes, buffer)):
79 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
79 return sqlite3.Binary(pickle.dumps(list(map(bytes, bufs)),-1))
80 elif bufs:
80 elif bufs:
81 return bufs
81 return bufs
82 else:
82 else:
@@ -83,7 +83,7 b' class AsyncResultTest(ClusterTestCase):'
83
83
84 def test_get_dict_single(self):
84 def test_get_dict_single(self):
85 view = self.client[-1]
85 view = self.client[-1]
86 for v in (range(5), 5, ('abc', 'def'), 'string'):
86 for v in (list(range(5)), 5, ('abc', 'def'), 'string'):
87 ar = view.apply_async(echo, v)
87 ar = view.apply_async(echo, v)
88 self.assertEqual(ar.get(), v)
88 self.assertEqual(ar.get(), v)
89 d = ar.get_dict()
89 d = ar.get_dict()
@@ -146,11 +146,11 b' class AsyncResultTest(ClusterTestCase):'
146
146
147 def test_len(self):
147 def test_len(self):
148 v = self.client.load_balanced_view()
148 v = self.client.load_balanced_view()
149 ar = v.map_async(lambda x: x, range(10))
149 ar = v.map_async(lambda x: x, list(range(10)))
150 self.assertEqual(len(ar), 10)
150 self.assertEqual(len(ar), 10)
151 ar = v.apply_async(lambda x: x, range(10))
151 ar = v.apply_async(lambda x: x, list(range(10)))
152 self.assertEqual(len(ar), 1)
152 self.assertEqual(len(ar), 1)
153 ar = self.client[:].apply_async(lambda x: x, range(10))
153 ar = self.client[:].apply_async(lambda x: x, list(range(10)))
154 self.assertEqual(len(ar), len(self.client.ids))
154 self.assertEqual(len(ar), len(self.client.ids))
155
155
156 def test_wall_time_single(self):
156 def test_wall_time_single(self):
@@ -95,7 +95,7 b' class TestClient(ClusterTestCase):'
95
95
96 def double(x):
96 def double(x):
97 return x*2
97 return x*2
98 seq = range(100)
98 seq = list(range(100))
99 ref = [ double(x) for x in seq ]
99 ref = [ double(x) for x in seq ]
100
100
101 # add some engines, which should be used
101 # add some engines, which should be used
@@ -41,9 +41,9 b' def wait(n):'
41 def func(x):
41 def func(x):
42 return x*x
42 return x*x
43
43
44 mixed = map(str, range(10))
44 mixed = list(map(str, range(10)))
45 completed = map(str, range(0,10,2))
45 completed = list(map(str, range(0,10,2)))
46 failed = map(str, range(1,10,2))
46 failed = list(map(str, range(1,10,2)))
47
47
48 class DependencyTest(ClusterTestCase):
48 class DependencyTest(ClusterTestCase):
49
49
@@ -74,12 +74,12 b' class DependencyTest(ClusterTestCase):'
74 def test_require_imports(self):
74 def test_require_imports(self):
75 """test that @require imports names"""
75 """test that @require imports names"""
76 @self.cancan
76 @self.cancan
77 @pmod.require('urllib')
77 @pmod.require('base64')
78 @interactive
78 @interactive
79 def encode(dikt):
79 def encode(arg):
80 return urllib.urlencode(dikt)
80 return base64.b64encode(arg)
81 # must pass through canning to properly connect namespaces
81 # must pass through canning to properly connect namespaces
82 self.assertEqual(encode(dict(a=5)), 'a=5')
82 self.assertEqual(encode(b'foo'), b'Zm9v')
83
83
84 def test_success_only(self):
84 def test_success_only(self):
85 dep = pmod.Dependency(mixed, success=True, failure=False)
85 dep = pmod.Dependency(mixed, success=True, failure=False)
@@ -55,17 +55,17 b' class TestLoadBalancedView(ClusterTestCase):'
55 def test_map(self):
55 def test_map(self):
56 def f(x):
56 def f(x):
57 return x**2
57 return x**2
58 data = range(16)
58 data = list(range(16))
59 r = self.view.map_sync(f, data)
59 r = self.view.map_sync(f, data)
60 self.assertEqual(r, map(f, data))
60 self.assertEqual(r, list(map(f, data)))
61
61
62 def test_map_generator(self):
62 def test_map_generator(self):
63 def f(x):
63 def f(x):
64 return x**2
64 return x**2
65
65
66 data = range(16)
66 data = list(range(16))
67 r = self.view.map_sync(f, iter(data))
67 r = self.view.map_sync(f, iter(data))
68 self.assertEqual(r, map(f, iter(data)))
68 self.assertEqual(r, list(map(f, iter(data))))
69
69
70 def test_map_short_first(self):
70 def test_map_short_first(self):
71 def f(x,y):
71 def f(x,y):
@@ -74,11 +74,11 b' class TestLoadBalancedView(ClusterTestCase):'
74 if x is None:
74 if x is None:
75 return x
75 return x
76 return x*y
76 return x*y
77 data = range(10)
77 data = list(range(10))
78 data2 = range(4)
78 data2 = list(range(4))
79
79
80 r = self.view.map_sync(f, data, data2)
80 r = self.view.map_sync(f, data, data2)
81 self.assertEqual(r, map(f, data, data2))
81 self.assertEqual(r, list(map(f, data, data2)))
82
82
83 def test_map_short_last(self):
83 def test_map_short_last(self):
84 def f(x,y):
84 def f(x,y):
@@ -87,11 +87,11 b' class TestLoadBalancedView(ClusterTestCase):'
87 if x is None:
87 if x is None:
88 return x
88 return x
89 return x*y
89 return x*y
90 data = range(4)
90 data = list(range(4))
91 data2 = range(10)
91 data2 = list(range(10))
92
92
93 r = self.view.map_sync(f, data, data2)
93 r = self.view.map_sync(f, data, data2)
94 self.assertEqual(r, map(f, data, data2))
94 self.assertEqual(r, list(map(f, data, data2)))
95
95
96 def test_map_unordered(self):
96 def test_map_unordered(self):
97 def f(x):
97 def f(x):
@@ -100,8 +100,8 b' class TestLoadBalancedView(ClusterTestCase):'
100 import time
100 import time
101 time.sleep(0.05*x)
101 time.sleep(0.05*x)
102 return x**2
102 return x**2
103 data = range(16,0,-1)
103 data = list(range(16,0,-1))
104 reference = map(f, data)
104 reference = list(map(f, data))
105
105
106 amr = self.view.map_async(slow_f, data, ordered=False)
106 amr = self.view.map_async(slow_f, data, ordered=False)
107 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
107 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
@@ -119,8 +119,8 b' class TestLoadBalancedView(ClusterTestCase):'
119 import time
119 import time
120 time.sleep(0.05*x)
120 time.sleep(0.05*x)
121 return x**2
121 return x**2
122 data = range(16,0,-1)
122 data = list(range(16,0,-1))
123 reference = map(f, data)
123 reference = list(map(f, data))
124
124
125 amr = self.view.map_async(slow_f, data)
125 amr = self.view.map_async(slow_f, data)
126 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
126 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
@@ -69,7 +69,7 b' class TestView(ClusterTestCase):'
69
69
70 def test_push_pull(self):
70 def test_push_pull(self):
71 """test pushing and pulling"""
71 """test pushing and pulling"""
72 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
72 data = dict(a=10, b=1.05, c=list(range(10)), d={'e':(1,2),'f':'hi'})
73 t = self.client.ids[-1]
73 t = self.client.ids[-1]
74 v = self.client[t]
74 v = self.client[t]
75 push = v.push
75 push = v.push
@@ -230,7 +230,7 b' class TestView(ClusterTestCase):'
230
230
231 def test_scatter_gather(self):
231 def test_scatter_gather(self):
232 view = self.client[:]
232 view = self.client[:]
233 seq1 = range(16)
233 seq1 = list(range(16))
234 view.scatter('a', seq1)
234 view.scatter('a', seq1)
235 seq2 = view.gather('a', block=True)
235 seq2 = view.gather('a', block=True)
236 self.assertEqual(seq2, seq1)
236 self.assertEqual(seq2, seq1)
@@ -249,7 +249,7 b' class TestView(ClusterTestCase):'
249 def test_scatter_gather_lazy(self):
249 def test_scatter_gather_lazy(self):
250 """scatter/gather with targets='all'"""
250 """scatter/gather with targets='all'"""
251 view = self.client.direct_view(targets='all')
251 view = self.client.direct_view(targets='all')
252 x = range(64)
252 x = list(range(64))
253 view.scatter('x', x)
253 view.scatter('x', x)
254 gathered = view.gather('x', block=True)
254 gathered = view.gather('x', block=True)
255 self.assertEqual(gathered, x)
255 self.assertEqual(gathered, x)
@@ -315,7 +315,7 b' class TestView(ClusterTestCase):'
315 """push/pull pandas.TimeSeries"""
315 """push/pull pandas.TimeSeries"""
316 import pandas
316 import pandas
317
317
318 ts = pandas.TimeSeries(range(10))
318 ts = pandas.TimeSeries(list(range(10)))
319
319
320 view = self.client[-1]
320 view = self.client[-1]
321
321
@@ -329,9 +329,9 b' class TestView(ClusterTestCase):'
329 view = self.client[:]
329 view = self.client[:]
330 def f(x):
330 def f(x):
331 return x**2
331 return x**2
332 data = range(16)
332 data = list(range(16))
333 r = view.map_sync(f, data)
333 r = view.map_sync(f, data)
334 self.assertEqual(r, map(f, data))
334 self.assertEqual(r, list(map(f, data)))
335
335
336 def test_map_iterable(self):
336 def test_map_iterable(self):
337 """test map on iterables (direct)"""
337 """test map on iterables (direct)"""
@@ -356,7 +356,7 b' class TestView(ClusterTestCase):'
356 assert_array_equal(r, arr)
356 assert_array_equal(r, arr)
357
357
358 def test_scatter_gather_nonblocking(self):
358 def test_scatter_gather_nonblocking(self):
359 data = range(16)
359 data = list(range(16))
360 view = self.client[:]
360 view = self.client[:]
361 view.scatter('a', data, block=False)
361 view.scatter('a', data, block=False)
362 ar = view.gather('a', block=False)
362 ar = view.gather('a', block=False)
@@ -451,7 +451,7 b' class TestView(ClusterTestCase):'
451
451
452 @interactive
452 @interactive
453 def check_unicode(a, check):
453 def check_unicode(a, check):
454 assert isinstance(a, unicode_type), "%r is not unicode"%a
454 assert not isinstance(a, bytes), "%r is bytes, not unicode"%a
455 assert isinstance(check, bytes), "%r is not bytes"%check
455 assert isinstance(check, bytes), "%r is not bytes"%check
456 assert a.encode('utf8') == check, "%s != %s"%(a,check)
456 assert a.encode('utf8') == check, "%s != %s"%(a,check)
457
457
@@ -488,7 +488,7 b' class TestView(ClusterTestCase):'
488
488
489 def test_eval_reference(self):
489 def test_eval_reference(self):
490 v = self.client[self.client.ids[0]]
490 v = self.client[self.client.ids[0]]
491 v['g'] = range(5)
491 v['g'] = list(range(5))
492 rg = pmod.Reference('g[0]')
492 rg = pmod.Reference('g[0]')
493 echo = lambda x:x
493 echo = lambda x:x
494 self.assertEqual(v.apply_sync(echo, rg), 0)
494 self.assertEqual(v.apply_sync(echo, rg), 0)
@@ -501,7 +501,7 b' class TestView(ClusterTestCase):'
501
501
502 def test_single_engine_map(self):
502 def test_single_engine_map(self):
503 e0 = self.client[self.client.ids[0]]
503 e0 = self.client[self.client.ids[0]]
504 r = range(5)
504 r = list(range(5))
505 check = [ -1*i for i in r ]
505 check = [ -1*i for i in r ]
506 result = e0.map_sync(lambda x: -1*x, r)
506 result = e0.map_sync(lambda x: -1*x, r)
507 self.assertEqual(result, check)
507 self.assertEqual(result, check)
@@ -231,7 +231,7 b' def _push(**ns):'
231 while tmp in user_ns:
231 while tmp in user_ns:
232 tmp = tmp + '_'
232 tmp = tmp + '_'
233 try:
233 try:
234 for name, value in iteritems(ns):
234 for name, value in ns.items():
235 user_ns[tmp] = value
235 user_ns[tmp] = value
236 exec("%s = %s" % (name, tmp), user_ns)
236 exec("%s = %s" % (name, tmp), user_ns)
237 finally:
237 finally:
@@ -241,7 +241,7 b' def _push(**ns):'
241 def _pull(keys):
241 def _pull(keys):
242 """helper method for implementing `client.pull` via `client.apply`"""
242 """helper method for implementing `client.pull` via `client.apply`"""
243 if isinstance(keys, (list,tuple, set)):
243 if isinstance(keys, (list,tuple, set)):
244 return map(lambda key: eval(key, globals()), keys)
244 return [eval(key, globals()) for key in keys]
245 else:
245 else:
246 return eval(keys, globals())
246 return eval(keys, globals())
247
247
General Comments 0
You need to be logged in to leave comments. Login now