diff --git a/IPython/parallel/apps/baseapp.py b/IPython/parallel/apps/baseapp.py index b8e828c..9ea8614 100644 --- a/IPython/parallel/apps/baseapp.py +++ b/IPython/parallel/apps/baseapp.py @@ -271,5 +271,5 @@ class BaseParallelApplication(BaseIPythonApplication): " Making the likely assumption that it is."%pid ) return True - pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE)) + pids = list(map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))) return pid in pids diff --git a/IPython/parallel/client/asyncresult.py b/IPython/parallel/client/asyncresult.py index f5b3080..8320ede 100644 --- a/IPython/parallel/client/asyncresult.py +++ b/IPython/parallel/client/asyncresult.py @@ -80,7 +80,7 @@ class AsyncResult(object): self._ready = False self._outputs_ready = False self._success = None - self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] + self._metadata = [self._client.metadata[id] for id in self.msg_ids] def __repr__(self): if self._ready: @@ -143,7 +143,7 @@ class AsyncResult(object): self._ready = self._client.wait(self.msg_ids, timeout) if self._ready: try: - results = map(self._client.results.get, self.msg_ids) + results = list(map(self._client.results.get, self.msg_ids)) self._result = results if self._single_result: r = results[0] @@ -669,10 +669,10 @@ class AsyncHubResult(AsyncResult): start = time.time() if self._ready: return - local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids) + local_ids = [m for m in self.msg_ids if m in self._client.outstanding] local_ready = self._client.wait(local_ids, timeout) if local_ready: - remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids) + remote_ids = [m for m in self.msg_ids if m not in self._client.results] if not remote_ids: self._ready = True else: @@ -687,7 +687,7 @@ class AsyncHubResult(AsyncResult): self._ready = True if self._ready: try: - results = map(self._client.results.get, self.msg_ids) + results = list(map(self._client.results.get, self.msg_ids)) self._result = results if self._single_result: r = results[0] @@ -702,6 +702,6 @@ class AsyncHubResult(AsyncResult): else: self._success = True finally: - self._metadata = map(self._client.metadata.get, self.msg_ids) + self._metadata = [self._client.metadata[mid] for mid in self.msg_ids] __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 24d4a21..4fb1e12 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -540,7 +540,7 @@ class Client(HasTraits): self._ids.append(eid) self._engines[eid] = v self._ids = sorted(self._ids) - if sorted(self._engines.keys()) != range(len(self._engines)) and \ + if sorted(self._engines.keys()) != list(range(len(self._engines))) and \ self._task_scheme == 'pure' and self._task_socket: self._stop_scheduling_tasks() @@ -581,7 +581,7 @@ class Client(HasTraits): targets = [targets] if isinstance(targets, slice): - indices = range(len(self._ids))[targets] + indices = list(range(len(self._ids))[targets]) ids = self.ids targets = [ ids[i] for i in indices ] @@ -1070,7 +1070,7 @@ class Client(HasTraits): # index access job = self.history[job] elif isinstance(job, AsyncResult): - map(theids.add, job.msg_ids) + theids.update(job.msg_ids) continue theids.add(job) if not theids.intersection(self.outstanding): @@ -1132,7 +1132,7 @@ class Client(HasTraits): msg_ids = [] if isinstance(jobs, string_types + (AsyncResult,)): jobs = [jobs] - bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult,)), jobs) + bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))] if bad_ids: raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) for j in jobs: @@ -1415,8 +1415,8 @@ class Client(HasTraits): raise TypeError("indices must be str or int, not %r"%id) theids.append(id) - local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids) - remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) + local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)] + remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids] # given single msg_id initially, get_result shot get the result itself, # not a length-one list @@ -1641,7 +1641,7 @@ class Client(HasTraits): if not targets: # needed as _build_targets otherwise uses all engines return [] target_ids = self._build_targets(targets)[0] - return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata) + return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids] def _build_msgids_from_jobs(self, jobs=None): """Build a list of msg_ids from "jobs" """ @@ -1650,7 +1650,7 @@ class Client(HasTraits): msg_ids = [] if isinstance(jobs, string_types + (AsyncResult,)): jobs = [jobs] - bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult)), jobs) + bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))] if bad_ids: raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) for j in jobs: @@ -1701,8 +1701,9 @@ class Client(HasTraits): msg_ids = [] msg_ids.extend(self._build_msgids_from_target(targets)) msg_ids.extend(self._build_msgids_from_jobs(jobs)) - map(self.results.pop, msg_ids) - map(self.metadata.pop, msg_ids) + for mid in msg_ids: + self.results.pop(mid) + self.metadata.pop(mid) @spin_first diff --git a/IPython/parallel/client/view.py b/IPython/parallel/client/view.py index a7d6422..d08f03c 100644 --- a/IPython/parallel/client/view.py +++ b/IPython/parallel/client/view.py @@ -32,7 +32,7 @@ from IPython.external.decorator import decorator from IPython.parallel import util from IPython.parallel.controller.dependency import Dependency, dependent -from IPython.utils.py3compat import string_types, iteritems +from IPython.utils.py3compat import string_types, iteritems, PY3 from . import map as Map from .asyncresult import AsyncResult, AsyncMapResult @@ -52,7 +52,7 @@ def save_ids(f, self, *args, **kwargs): nmsgs = len(self.client.history) - n_previous msg_ids = self.client.history[-nmsgs:] self.history.extend(msg_ids) - map(self.outstanding.add, msg_ids) + self.outstanding.update(msg_ids) return ret @decorator @@ -952,8 +952,9 @@ class LoadBalancedView(View): raise ValueError("Invalid dependency: %r"%value) if 'timeout' in kwargs: t = kwargs['timeout'] - if not isinstance(t, (int, long, float, type(None))): - raise TypeError("Invalid type for timeout: %r"%type(t)) + if not isinstance(t, (int, float, type(None))): + if (not PY3) and (not isinstance(t, long)): + raise TypeError("Invalid type for timeout: %r"%type(t)) if t is not None: if t < 0: raise ValueError("Invalid timeout: %s"%t) diff --git a/IPython/parallel/controller/dependency.py b/IPython/parallel/controller/dependency.py index 72d1660..d91753d 100644 --- a/IPython/parallel/controller/dependency.py +++ b/IPython/parallel/controller/dependency.py @@ -56,7 +56,11 @@ class dependent(object): def __init__(self, f, df, *dargs, **dkwargs): self.f = f - self.__name__ = getattr(f, '__name__', 'f') + name = getattr(f, '__name__', 'f') + if py3compat.PY3: + self.__name__ = name + else: + self.func_name = name self.df = df self.dargs = dargs self.dkwargs = dkwargs diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index d961222..f025e4c 100755 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -125,10 +125,12 @@ class HeartMonitor(LoggingConfigurable): goodhearts = self.hearts.intersection(self.responses) missed_beats = self.hearts.difference(goodhearts) newhearts = self.responses.difference(goodhearts) - map(self.handle_new_heart, newhearts) + for heart in newhearts: + self.handle_new_heart(heart) heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation, self.hearts) - map(self.handle_heart_failure, heartfailures) + for failure in heartfailures: + self.handle_heart_failure(failure) self.on_probation = on_probation self.responses = set() #print self.on_probation, self.hearts diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index e16c94b..830fbc8 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -193,9 +193,9 @@ class HubFactory(RegistrationFactory): help="IP on which to listen for monitor messages. [default: loopback]") monitor_transport = Unicode('tcp', config=True, help="0MQ transport for monitor messages. [default : tcp]") - + _client_ip_default = _monitor_ip_default = _engine_ip_default - + monitor_url = Unicode('') @@ -1190,7 +1190,7 @@ class Hub(SessionFactory): except Exception: reply = error.wrap_exception() else: - pending = filter(lambda m: m in self.pending, msg_ids) + pending = [m for m in msg_ids if (m in self.pending)] if pending: try: raise IndexError("msg pending: %r" % pending[0]) @@ -1312,7 +1312,7 @@ class Hub(SessionFactory): 'io' : io_dict, } if rec['result_buffers']: - buffers = map(bytes, rec['result_buffers']) + buffers = list(map(bytes, rec['result_buffers'])) else: buffers = [] diff --git a/IPython/parallel/controller/mongodb.py b/IPython/parallel/controller/mongodb.py index 1d987e3..416f97f 100644 --- a/IPython/parallel/controller/mongodb.py +++ b/IPython/parallel/controller/mongodb.py @@ -62,7 +62,7 @@ class MongoDB(BaseDB): def _binary_buffers(self, rec): for key in ('buffers', 'result_buffers'): if rec.get(key, None): - rec[key] = map(Binary, rec[key]) + rec[key] = list(map(Binary, rec[key])) return rec def add_record(self, msg_id, rec): diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 1f528aa..ecd73a5 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -364,7 +364,7 @@ class TaskScheduler(SessionFactory): date=datetime.now(), ) msg = self.session.msg('apply_reply', content, parent=parent, metadata=md) - raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents)) + raw_reply = list(map(zmq.Message, self.session.serialize(msg, ident=idents))) # and dispatch it self.dispatch_result(raw_reply) @@ -402,8 +402,7 @@ class TaskScheduler(SessionFactory): # get targets as a set of bytes objects # from a list of unicode objects targets = md.get('targets', []) - targets = map(cast_bytes, targets) - targets = set(targets) + targets = set(map(cast_bytes, targets)) retries = md.get('retries', 0) self.retries[msg_id] = retries @@ -515,7 +514,7 @@ class TaskScheduler(SessionFactory): def available_engines(self): """return a list of available engine indices based on HWM""" if not self.hwm: - return range(len(self.targets)) + return list(range(len(self.targets))) available = [] for idx in range(len(self.targets)): if self.loads[idx] < self.hwm: @@ -547,7 +546,7 @@ class TaskScheduler(SessionFactory): # check follow return job.follow.check(self.completed[target], self.failed[target]) - indices = filter(can_run, available) + indices = list(filter(can_run, available)) if not indices: # couldn't run diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py index 94a07e3..2187af1 100644 --- a/IPython/parallel/controller/sqlitedb.py +++ b/IPython/parallel/controller/sqlitedb.py @@ -76,7 +76,7 @@ def _adapt_bufs(bufs): # this is *horrible* # copy buffers into single list and pickle it: if bufs and isinstance(bufs[0], (bytes, buffer)): - return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1)) + return sqlite3.Binary(pickle.dumps(list(map(bytes, bufs)),-1)) elif bufs: return bufs else: diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py index db61cf6..27d8500 100644 --- a/IPython/parallel/tests/test_asyncresult.py +++ b/IPython/parallel/tests/test_asyncresult.py @@ -83,7 +83,7 @@ class AsyncResultTest(ClusterTestCase): def test_get_dict_single(self): view = self.client[-1] - for v in (range(5), 5, ('abc', 'def'), 'string'): + for v in (list(range(5)), 5, ('abc', 'def'), 'string'): ar = view.apply_async(echo, v) self.assertEqual(ar.get(), v) d = ar.get_dict() @@ -146,11 +146,11 @@ class AsyncResultTest(ClusterTestCase): def test_len(self): v = self.client.load_balanced_view() - ar = v.map_async(lambda x: x, range(10)) + ar = v.map_async(lambda x: x, list(range(10))) self.assertEqual(len(ar), 10) - ar = v.apply_async(lambda x: x, range(10)) + ar = v.apply_async(lambda x: x, list(range(10))) self.assertEqual(len(ar), 1) - ar = self.client[:].apply_async(lambda x: x, range(10)) + ar = self.client[:].apply_async(lambda x: x, list(range(10))) self.assertEqual(len(ar), len(self.client.ids)) def test_wall_time_single(self): diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py index 34d9c88..207841f 100644 --- a/IPython/parallel/tests/test_client.py +++ b/IPython/parallel/tests/test_client.py @@ -95,7 +95,7 @@ class TestClient(ClusterTestCase): def double(x): return x*2 - seq = range(100) + seq = list(range(100)) ref = [ double(x) for x in seq ] # add some engines, which should be used diff --git a/IPython/parallel/tests/test_dependency.py b/IPython/parallel/tests/test_dependency.py index 024d16a..ddcc749 100644 --- a/IPython/parallel/tests/test_dependency.py +++ b/IPython/parallel/tests/test_dependency.py @@ -41,9 +41,9 @@ def wait(n): def func(x): return x*x -mixed = map(str, range(10)) -completed = map(str, range(0,10,2)) -failed = map(str, range(1,10,2)) +mixed = list(map(str, range(10))) +completed = list(map(str, range(0,10,2))) +failed = list(map(str, range(1,10,2))) class DependencyTest(ClusterTestCase): @@ -74,12 +74,12 @@ class DependencyTest(ClusterTestCase): def test_require_imports(self): """test that @require imports names""" @self.cancan - @pmod.require('urllib') + @pmod.require('base64') @interactive - def encode(dikt): - return urllib.urlencode(dikt) + def encode(arg): + return base64.b64encode(arg) # must pass through canning to properly connect namespaces - self.assertEqual(encode(dict(a=5)), 'a=5') + self.assertEqual(encode(b'foo'), b'Zm9v') def test_success_only(self): dep = pmod.Dependency(mixed, success=True, failure=False) diff --git a/IPython/parallel/tests/test_lbview.py b/IPython/parallel/tests/test_lbview.py index 27bcd86..96939a8 100644 --- a/IPython/parallel/tests/test_lbview.py +++ b/IPython/parallel/tests/test_lbview.py @@ -55,17 +55,17 @@ class TestLoadBalancedView(ClusterTestCase): def test_map(self): def f(x): return x**2 - data = range(16) + data = list(range(16)) r = self.view.map_sync(f, data) - self.assertEqual(r, map(f, data)) + self.assertEqual(r, list(map(f, data))) def test_map_generator(self): def f(x): return x**2 - data = range(16) + data = list(range(16)) r = self.view.map_sync(f, iter(data)) - self.assertEqual(r, map(f, iter(data))) + self.assertEqual(r, list(map(f, iter(data)))) def test_map_short_first(self): def f(x,y): @@ -74,11 +74,11 @@ class TestLoadBalancedView(ClusterTestCase): if x is None: return x return x*y - data = range(10) - data2 = range(4) + data = list(range(10)) + data2 = list(range(4)) r = self.view.map_sync(f, data, data2) - self.assertEqual(r, map(f, data, data2)) + self.assertEqual(r, list(map(f, data, data2))) def test_map_short_last(self): def f(x,y): @@ -87,11 +87,11 @@ class TestLoadBalancedView(ClusterTestCase): if x is None: return x return x*y - data = range(4) - data2 = range(10) + data = list(range(4)) + data2 = list(range(10)) r = self.view.map_sync(f, data, data2) - self.assertEqual(r, map(f, data, data2)) + self.assertEqual(r, list(map(f, data, data2))) def test_map_unordered(self): def f(x): @@ -100,8 +100,8 @@ class TestLoadBalancedView(ClusterTestCase): import time time.sleep(0.05*x) return x**2 - data = range(16,0,-1) - reference = map(f, data) + data = list(range(16,0,-1)) + reference = list(map(f, data)) amr = self.view.map_async(slow_f, data, ordered=False) self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) @@ -119,8 +119,8 @@ class TestLoadBalancedView(ClusterTestCase): import time time.sleep(0.05*x) return x**2 - data = range(16,0,-1) - reference = map(f, data) + data = list(range(16,0,-1)) + reference = list(map(f, data)) amr = self.view.map_async(slow_f, data) self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) diff --git a/IPython/parallel/tests/test_view.py b/IPython/parallel/tests/test_view.py index f3c9f5e..bb3625c 100644 --- a/IPython/parallel/tests/test_view.py +++ b/IPython/parallel/tests/test_view.py @@ -69,7 +69,7 @@ class TestView(ClusterTestCase): def test_push_pull(self): """test pushing and pulling""" - data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) + data = dict(a=10, b=1.05, c=list(range(10)), d={'e':(1,2),'f':'hi'}) t = self.client.ids[-1] v = self.client[t] push = v.push @@ -230,7 +230,7 @@ class TestView(ClusterTestCase): def test_scatter_gather(self): view = self.client[:] - seq1 = range(16) + seq1 = list(range(16)) view.scatter('a', seq1) seq2 = view.gather('a', block=True) self.assertEqual(seq2, seq1) @@ -249,7 +249,7 @@ class TestView(ClusterTestCase): def test_scatter_gather_lazy(self): """scatter/gather with targets='all'""" view = self.client.direct_view(targets='all') - x = range(64) + x = list(range(64)) view.scatter('x', x) gathered = view.gather('x', block=True) self.assertEqual(gathered, x) @@ -315,7 +315,7 @@ class TestView(ClusterTestCase): """push/pull pandas.TimeSeries""" import pandas - ts = pandas.TimeSeries(range(10)) + ts = pandas.TimeSeries(list(range(10))) view = self.client[-1] @@ -329,9 +329,9 @@ class TestView(ClusterTestCase): view = self.client[:] def f(x): return x**2 - data = range(16) + data = list(range(16)) r = view.map_sync(f, data) - self.assertEqual(r, map(f, data)) + self.assertEqual(r, list(map(f, data))) def test_map_iterable(self): """test map on iterables (direct)""" @@ -356,7 +356,7 @@ class TestView(ClusterTestCase): assert_array_equal(r, arr) def test_scatter_gather_nonblocking(self): - data = range(16) + data = list(range(16)) view = self.client[:] view.scatter('a', data, block=False) ar = view.gather('a', block=False) @@ -451,7 +451,7 @@ class TestView(ClusterTestCase): @interactive def check_unicode(a, check): - assert isinstance(a, unicode_type), "%r is not unicode"%a + assert not isinstance(a, bytes), "%r is bytes, not unicode"%a assert isinstance(check, bytes), "%r is not bytes"%check assert a.encode('utf8') == check, "%s != %s"%(a,check) @@ -488,7 +488,7 @@ class TestView(ClusterTestCase): def test_eval_reference(self): v = self.client[self.client.ids[0]] - v['g'] = range(5) + v['g'] = list(range(5)) rg = pmod.Reference('g[0]') echo = lambda x:x self.assertEqual(v.apply_sync(echo, rg), 0) @@ -501,7 +501,7 @@ class TestView(ClusterTestCase): def test_single_engine_map(self): e0 = self.client[self.client.ids[0]] - r = range(5) + r = list(range(5)) check = [ -1*i for i in r ] result = e0.map_sync(lambda x: -1*x, r) self.assertEqual(result, check) diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py index b0ca794..d8344e9 100644 --- a/IPython/parallel/util.py +++ b/IPython/parallel/util.py @@ -231,7 +231,7 @@ def _push(**ns): while tmp in user_ns: tmp = tmp + '_' try: - for name, value in iteritems(ns): + for name, value in ns.items(): user_ns[tmp] = value exec("%s = %s" % (name, tmp), user_ns) finally: @@ -241,7 +241,7 @@ def _push(**ns): def _pull(keys): """helper method for implementing `client.pull` via `client.apply`""" if isinstance(keys, (list,tuple, set)): - return map(lambda key: eval(key, globals()), keys) + return [eval(key, globals()) for key in keys] else: return eval(keys, globals())