From c4ee496be1aa29bcd0d0a6d123e4d7dfde886e45 2013-10-29 16:15:56
From: Thomas Kluyver <takowl@gmail.com>
Date: 2013-10-29 16:15:56
Subject: [PATCH] Fix parallel test suite

---

diff --git a/IPython/parallel/apps/baseapp.py b/IPython/parallel/apps/baseapp.py
index b8e828c..9ea8614 100644
--- a/IPython/parallel/apps/baseapp.py
+++ b/IPython/parallel/apps/baseapp.py
@@ -271,5 +271,5 @@ class BaseParallelApplication(BaseIPythonApplication):
                     " Making the likely assumption that it is."%pid
                 )
                 return True
-            pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
+            pids = list(map(int, re.findall(r'^\W*\d+', output, re.MULTILINE)))
             return pid in pids
diff --git a/IPython/parallel/client/asyncresult.py b/IPython/parallel/client/asyncresult.py
index f5b3080..8320ede 100644
--- a/IPython/parallel/client/asyncresult.py
+++ b/IPython/parallel/client/asyncresult.py
@@ -80,7 +80,7 @@ class AsyncResult(object):
         self._ready = False
         self._outputs_ready = False
         self._success = None
-        self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
+        self._metadata = [self._client.metadata[id] for id in self.msg_ids]
 
     def __repr__(self):
         if self._ready:
@@ -143,7 +143,7 @@ class AsyncResult(object):
         self._ready = self._client.wait(self.msg_ids, timeout)
         if self._ready:
             try:
-                results = map(self._client.results.get, self.msg_ids)
+                results = list(map(self._client.results.get, self.msg_ids))
                 self._result = results
                 if self._single_result:
                     r = results[0]
@@ -669,10 +669,10 @@ class AsyncHubResult(AsyncResult):
         start = time.time()
         if self._ready:
             return
-        local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
+        local_ids = [m for m in self.msg_ids if m in self._client.outstanding]
         local_ready = self._client.wait(local_ids, timeout)
         if local_ready:
-            remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
+            remote_ids = [m for m in self.msg_ids if m not in self._client.results]
             if not remote_ids:
                 self._ready = True
             else:
@@ -687,7 +687,7 @@ class AsyncHubResult(AsyncResult):
                     self._ready = True
         if self._ready:
             try:
-                results = map(self._client.results.get, self.msg_ids)
+                results = list(map(self._client.results.get, self.msg_ids))
                 self._result = results
                 if self._single_result:
                     r = results[0]
@@ -702,6 +702,6 @@ class AsyncHubResult(AsyncResult):
             else:
                 self._success = True
             finally:
-                self._metadata = map(self._client.metadata.get, self.msg_ids)
+                self._metadata = [self._client.metadata[mid] for mid in self.msg_ids]
 
 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py
index 24d4a21..4fb1e12 100644
--- a/IPython/parallel/client/client.py
+++ b/IPython/parallel/client/client.py
@@ -540,7 +540,7 @@ class Client(HasTraits):
                 self._ids.append(eid)
             self._engines[eid] = v
         self._ids = sorted(self._ids)
-        if sorted(self._engines.keys()) != range(len(self._engines)) and \
+        if sorted(self._engines.keys()) != list(range(len(self._engines))) and \
                         self._task_scheme == 'pure' and self._task_socket:
             self._stop_scheduling_tasks()
 
@@ -581,7 +581,7 @@ class Client(HasTraits):
             targets = [targets]
 
         if isinstance(targets, slice):
-            indices = range(len(self._ids))[targets]
+            indices = list(range(len(self._ids))[targets])
             ids = self.ids
             targets = [ ids[i] for i in indices ]
 
@@ -1070,7 +1070,7 @@ class Client(HasTraits):
                     # index access
                     job = self.history[job]
                 elif isinstance(job, AsyncResult):
-                    map(theids.add, job.msg_ids)
+                    theids.update(job.msg_ids)
                     continue
                 theids.add(job)
         if not theids.intersection(self.outstanding):
@@ -1132,7 +1132,7 @@ class Client(HasTraits):
         msg_ids = []
         if isinstance(jobs, string_types + (AsyncResult,)):
             jobs = [jobs]
-        bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult,)), jobs)
+        bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
         if bad_ids:
             raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
         for j in jobs:
@@ -1415,8 +1415,8 @@ class Client(HasTraits):
                 raise TypeError("indices must be str or int, not %r"%id)
             theids.append(id)
 
-        local_ids = filter(lambda msg_id: msg_id in self.outstanding or msg_id in self.results, theids)
-        remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
+        local_ids = [msg_id for msg_id in theids if (msg_id in self.outstanding or msg_id in self.results)]
+        remote_ids = [msg_id for msg_id in theids if msg_id not in local_ids]
         
         # given single msg_id initially, get_result shot get the result itself,
         # not a length-one list
@@ -1641,7 +1641,7 @@ class Client(HasTraits):
         if not targets: # needed as _build_targets otherwise uses all engines
             return []
         target_ids = self._build_targets(targets)[0] 
-        return filter(lambda md_id: self.metadata[md_id]["engine_uuid"] in target_ids, self.metadata)
+        return [md_id for md_id in self.metadata if self.metadata[md_id]["engine_uuid"] in target_ids]
     
     def _build_msgids_from_jobs(self, jobs=None):
         """Build a list of msg_ids from "jobs" """
@@ -1650,7 +1650,7 @@ class Client(HasTraits):
         msg_ids = []
         if isinstance(jobs, string_types + (AsyncResult,)):
             jobs = [jobs]
-        bad_ids = filter(lambda obj: not isinstance(obj, string_types + (AsyncResult)), jobs)
+        bad_ids = [obj for obj in jobs if not isinstance(obj, string_types + (AsyncResult,))]
         if bad_ids:
             raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
         for j in jobs:
@@ -1701,8 +1701,9 @@ class Client(HasTraits):
             msg_ids = []
             msg_ids.extend(self._build_msgids_from_target(targets))
             msg_ids.extend(self._build_msgids_from_jobs(jobs))
-            map(self.results.pop, msg_ids)
-            map(self.metadata.pop, msg_ids)
+            for mid in msg_ids:
+                self.results.pop(mid)
+                self.metadata.pop(mid)
 
 
     @spin_first
diff --git a/IPython/parallel/client/view.py b/IPython/parallel/client/view.py
index a7d6422..d08f03c 100644
--- a/IPython/parallel/client/view.py
+++ b/IPython/parallel/client/view.py
@@ -32,7 +32,7 @@ from IPython.external.decorator import decorator
 
 from IPython.parallel import util
 from IPython.parallel.controller.dependency import Dependency, dependent
-from IPython.utils.py3compat import string_types, iteritems
+from IPython.utils.py3compat import string_types, iteritems, PY3
 
 from . import map as Map
 from .asyncresult import AsyncResult, AsyncMapResult
@@ -52,7 +52,7 @@ def save_ids(f, self, *args, **kwargs):
         nmsgs = len(self.client.history) - n_previous
         msg_ids = self.client.history[-nmsgs:]
         self.history.extend(msg_ids)
-        map(self.outstanding.add, msg_ids)
+        self.outstanding.update(msg_ids)
     return ret
 
 @decorator
@@ -952,8 +952,9 @@ class LoadBalancedView(View):
                     raise ValueError("Invalid dependency: %r"%value)
         if 'timeout' in kwargs:
             t = kwargs['timeout']
-            if not isinstance(t, (int, long, float, type(None))):
-                raise TypeError("Invalid type for timeout: %r"%type(t))
+            if not isinstance(t, (int, float, type(None))):
+                if (not PY3) and (not isinstance(t, long)):
+                    raise TypeError("Invalid type for timeout: %r"%type(t))
             if t is not None:
                 if t < 0:
                     raise ValueError("Invalid timeout: %s"%t)
diff --git a/IPython/parallel/controller/dependency.py b/IPython/parallel/controller/dependency.py
index 72d1660..d91753d 100644
--- a/IPython/parallel/controller/dependency.py
+++ b/IPython/parallel/controller/dependency.py
@@ -56,7 +56,11 @@ class dependent(object):
 
     def __init__(self, f, df, *dargs, **dkwargs):
         self.f = f
-        self.__name__ = getattr(f, '__name__', 'f')
+        name = getattr(f, '__name__', 'f')
+        if py3compat.PY3:
+            self.__name__ = name
+        else:
+            self.func_name = name
         self.df = df
         self.dargs = dargs
         self.dkwargs = dkwargs
diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py
index d961222..f025e4c 100755
--- a/IPython/parallel/controller/heartmonitor.py
+++ b/IPython/parallel/controller/heartmonitor.py
@@ -125,10 +125,12 @@ class HeartMonitor(LoggingConfigurable):
         goodhearts = self.hearts.intersection(self.responses)
         missed_beats = self.hearts.difference(goodhearts)
         newhearts = self.responses.difference(goodhearts)
-        map(self.handle_new_heart, newhearts)
+        for heart in newhearts:
+            self.handle_new_heart(heart)
         heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
                                                          self.hearts)
-        map(self.handle_heart_failure, heartfailures)
+        for failure in heartfailures:
+            self.handle_heart_failure(failure)
         self.on_probation = on_probation
         self.responses = set()
         #print self.on_probation, self.hearts
diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py
index e16c94b..830fbc8 100644
--- a/IPython/parallel/controller/hub.py
+++ b/IPython/parallel/controller/hub.py
@@ -193,9 +193,9 @@ class HubFactory(RegistrationFactory):
         help="IP on which to listen for monitor messages. [default: loopback]")
     monitor_transport = Unicode('tcp', config=True,
         help="0MQ transport for monitor messages. [default : tcp]")
-    
+
     _client_ip_default = _monitor_ip_default = _engine_ip_default
-    
+
 
     monitor_url = Unicode('')
 
@@ -1190,7 +1190,7 @@ class Hub(SessionFactory):
             except Exception:
                 reply = error.wrap_exception()
         else:
-            pending = filter(lambda m: m in self.pending, msg_ids)
+            pending = [m for m in msg_ids if (m in self.pending)]
             if pending:
                 try:
                     raise IndexError("msg pending: %r" % pending[0])
@@ -1312,7 +1312,7 @@ class Hub(SessionFactory):
             'io' : io_dict,
         }
         if rec['result_buffers']:
-            buffers = map(bytes, rec['result_buffers'])
+            buffers = list(map(bytes, rec['result_buffers']))
         else:
             buffers = []
 
diff --git a/IPython/parallel/controller/mongodb.py b/IPython/parallel/controller/mongodb.py
index 1d987e3..416f97f 100644
--- a/IPython/parallel/controller/mongodb.py
+++ b/IPython/parallel/controller/mongodb.py
@@ -62,7 +62,7 @@ class MongoDB(BaseDB):
     def _binary_buffers(self, rec):
         for key in ('buffers', 'result_buffers'):
             if rec.get(key, None):
-                rec[key] = map(Binary, rec[key])
+                rec[key] = list(map(Binary, rec[key]))
         return rec
     
     def add_record(self, msg_id, rec):
diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py
index 1f528aa..ecd73a5 100644
--- a/IPython/parallel/controller/scheduler.py
+++ b/IPython/parallel/controller/scheduler.py
@@ -364,7 +364,7 @@ class TaskScheduler(SessionFactory):
                 date=datetime.now(),
             )
             msg = self.session.msg('apply_reply', content, parent=parent, metadata=md)
-            raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
+            raw_reply = list(map(zmq.Message, self.session.serialize(msg, ident=idents)))
             # and dispatch it
             self.dispatch_result(raw_reply)
 
@@ -402,8 +402,7 @@ class TaskScheduler(SessionFactory):
         # get targets as a set of bytes objects
         # from a list of unicode objects
         targets = md.get('targets', [])
-        targets = map(cast_bytes, targets)
-        targets = set(targets)
+        targets = set(map(cast_bytes, targets))
 
         retries = md.get('retries', 0)
         self.retries[msg_id] = retries
@@ -515,7 +514,7 @@ class TaskScheduler(SessionFactory):
     def available_engines(self):
         """return a list of available engine indices based on HWM"""
         if not self.hwm:
-            return range(len(self.targets))
+            return list(range(len(self.targets)))
         available = []
         for idx in range(len(self.targets)):
             if self.loads[idx] < self.hwm:
@@ -547,7 +546,7 @@ class TaskScheduler(SessionFactory):
                 # check follow
                 return job.follow.check(self.completed[target], self.failed[target])
 
-            indices = filter(can_run, available)
+            indices = list(filter(can_run, available))
 
             if not indices:
                 # couldn't run
diff --git a/IPython/parallel/controller/sqlitedb.py b/IPython/parallel/controller/sqlitedb.py
index 94a07e3..2187af1 100644
--- a/IPython/parallel/controller/sqlitedb.py
+++ b/IPython/parallel/controller/sqlitedb.py
@@ -76,7 +76,7 @@ def _adapt_bufs(bufs):
     # this is *horrible*
     # copy buffers into single list and pickle it:
     if bufs and isinstance(bufs[0], (bytes, buffer)):
-        return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
+        return sqlite3.Binary(pickle.dumps(list(map(bytes, bufs)),-1))
     elif bufs:
         return bufs
     else:
diff --git a/IPython/parallel/tests/test_asyncresult.py b/IPython/parallel/tests/test_asyncresult.py
index db61cf6..27d8500 100644
--- a/IPython/parallel/tests/test_asyncresult.py
+++ b/IPython/parallel/tests/test_asyncresult.py
@@ -83,7 +83,7 @@ class AsyncResultTest(ClusterTestCase):
     
     def test_get_dict_single(self):
         view = self.client[-1]
-        for v in (range(5), 5, ('abc', 'def'), 'string'):
+        for v in (list(range(5)), 5, ('abc', 'def'), 'string'):
             ar = view.apply_async(echo, v)
             self.assertEqual(ar.get(), v)
             d = ar.get_dict()
@@ -146,11 +146,11 @@ class AsyncResultTest(ClusterTestCase):
     
     def test_len(self):
         v = self.client.load_balanced_view()
-        ar = v.map_async(lambda x: x, range(10))
+        ar = v.map_async(lambda x: x, list(range(10)))
         self.assertEqual(len(ar), 10)
-        ar = v.apply_async(lambda x: x, range(10))
+        ar = v.apply_async(lambda x: x, list(range(10)))
         self.assertEqual(len(ar), 1)
-        ar = self.client[:].apply_async(lambda x: x, range(10))
+        ar = self.client[:].apply_async(lambda x: x, list(range(10)))
         self.assertEqual(len(ar), len(self.client.ids))
     
     def test_wall_time_single(self):
diff --git a/IPython/parallel/tests/test_client.py b/IPython/parallel/tests/test_client.py
index 34d9c88..207841f 100644
--- a/IPython/parallel/tests/test_client.py
+++ b/IPython/parallel/tests/test_client.py
@@ -95,7 +95,7 @@ class TestClient(ClusterTestCase):
         
         def double(x):
             return x*2
-        seq = range(100)
+        seq = list(range(100))
         ref = [ double(x) for x in seq ]
         
         # add some engines, which should be used
diff --git a/IPython/parallel/tests/test_dependency.py b/IPython/parallel/tests/test_dependency.py
index 024d16a..ddcc749 100644
--- a/IPython/parallel/tests/test_dependency.py
+++ b/IPython/parallel/tests/test_dependency.py
@@ -41,9 +41,9 @@ def wait(n):
 def func(x):
     return x*x
 
-mixed = map(str, range(10))
-completed = map(str, range(0,10,2))
-failed = map(str, range(1,10,2))
+mixed = list(map(str, range(10)))
+completed = list(map(str, range(0,10,2)))
+failed = list(map(str, range(1,10,2)))
 
 class DependencyTest(ClusterTestCase):
     
@@ -74,12 +74,12 @@ class DependencyTest(ClusterTestCase):
     def test_require_imports(self):
         """test that @require imports names"""
         @self.cancan
-        @pmod.require('urllib')
+        @pmod.require('base64')
         @interactive
-        def encode(dikt):
-            return urllib.urlencode(dikt)
+        def encode(arg):
+            return base64.b64encode(arg)
         # must pass through canning to properly connect namespaces
-        self.assertEqual(encode(dict(a=5)), 'a=5')
+        self.assertEqual(encode(b'foo'), b'Zm9v')
     
     def test_success_only(self):
         dep = pmod.Dependency(mixed, success=True, failure=False)
diff --git a/IPython/parallel/tests/test_lbview.py b/IPython/parallel/tests/test_lbview.py
index 27bcd86..96939a8 100644
--- a/IPython/parallel/tests/test_lbview.py
+++ b/IPython/parallel/tests/test_lbview.py
@@ -55,17 +55,17 @@ class TestLoadBalancedView(ClusterTestCase):
     def test_map(self):
         def f(x):
             return x**2
-        data = range(16)
+        data = list(range(16))
         r = self.view.map_sync(f, data)
-        self.assertEqual(r, map(f, data))
+        self.assertEqual(r, list(map(f, data)))
     
     def test_map_generator(self):
         def f(x):
             return x**2
         
-        data = range(16)
+        data = list(range(16))
         r = self.view.map_sync(f, iter(data))
-        self.assertEqual(r, map(f, iter(data)))
+        self.assertEqual(r, list(map(f, iter(data))))
     
     def test_map_short_first(self):
         def f(x,y):
@@ -74,11 +74,11 @@ class TestLoadBalancedView(ClusterTestCase):
             if x is None:
                 return x
             return x*y
-        data = range(10)
-        data2 = range(4)
+        data = list(range(10))
+        data2 = list(range(4))
         
         r = self.view.map_sync(f, data, data2)
-        self.assertEqual(r, map(f, data, data2))
+        self.assertEqual(r, list(map(f, data, data2)))
 
     def test_map_short_last(self):
         def f(x,y):
@@ -87,11 +87,11 @@ class TestLoadBalancedView(ClusterTestCase):
             if x is None:
                 return x
             return x*y
-        data = range(4)
-        data2 = range(10)
+        data = list(range(4))
+        data2 = list(range(10))
         
         r = self.view.map_sync(f, data, data2)
-        self.assertEqual(r, map(f, data, data2))
+        self.assertEqual(r, list(map(f, data, data2)))
 
     def test_map_unordered(self):
         def f(x):
@@ -100,8 +100,8 @@ class TestLoadBalancedView(ClusterTestCase):
             import time
             time.sleep(0.05*x)
             return x**2
-        data = range(16,0,-1)
-        reference = map(f, data)
+        data = list(range(16,0,-1))
+        reference = list(map(f, data))
         
         amr = self.view.map_async(slow_f, data, ordered=False)
         self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
@@ -119,8 +119,8 @@ class TestLoadBalancedView(ClusterTestCase):
             import time
             time.sleep(0.05*x)
             return x**2
-        data = range(16,0,-1)
-        reference = map(f, data)
+        data = list(range(16,0,-1))
+        reference = list(map(f, data))
         
         amr = self.view.map_async(slow_f, data)
         self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
diff --git a/IPython/parallel/tests/test_view.py b/IPython/parallel/tests/test_view.py
index f3c9f5e..bb3625c 100644
--- a/IPython/parallel/tests/test_view.py
+++ b/IPython/parallel/tests/test_view.py
@@ -69,7 +69,7 @@ class TestView(ClusterTestCase):
     
     def test_push_pull(self):
         """test pushing and pulling"""
-        data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
+        data = dict(a=10, b=1.05, c=list(range(10)), d={'e':(1,2),'f':'hi'})
         t = self.client.ids[-1]
         v = self.client[t]
         push = v.push
@@ -230,7 +230,7 @@ class TestView(ClusterTestCase):
 
     def test_scatter_gather(self):
         view = self.client[:]
-        seq1 = range(16)
+        seq1 = list(range(16))
         view.scatter('a', seq1)
         seq2 = view.gather('a', block=True)
         self.assertEqual(seq2, seq1)
@@ -249,7 +249,7 @@ class TestView(ClusterTestCase):
     def test_scatter_gather_lazy(self):
         """scatter/gather with targets='all'"""
         view = self.client.direct_view(targets='all')
-        x = range(64)
+        x = list(range(64))
         view.scatter('x', x)
         gathered = view.gather('x', block=True)
         self.assertEqual(gathered, x)
@@ -315,7 +315,7 @@ class TestView(ClusterTestCase):
         """push/pull pandas.TimeSeries"""
         import pandas
         
-        ts = pandas.TimeSeries(range(10))
+        ts = pandas.TimeSeries(list(range(10)))
         
         view = self.client[-1]
         
@@ -329,9 +329,9 @@ class TestView(ClusterTestCase):
         view = self.client[:]
         def f(x):
             return x**2
-        data = range(16)
+        data = list(range(16))
         r = view.map_sync(f, data)
-        self.assertEqual(r, map(f, data))
+        self.assertEqual(r, list(map(f, data)))
     
     def test_map_iterable(self):
         """test map on iterables (direct)"""
@@ -356,7 +356,7 @@ class TestView(ClusterTestCase):
         assert_array_equal(r, arr)
     
     def test_scatter_gather_nonblocking(self):
-        data = range(16)
+        data = list(range(16))
         view = self.client[:]
         view.scatter('a', data, block=False)
         ar = view.gather('a', block=False)
@@ -451,7 +451,7 @@ class TestView(ClusterTestCase):
         
         @interactive
         def check_unicode(a, check):
-            assert isinstance(a, unicode_type), "%r is not unicode"%a
+            assert not isinstance(a, bytes), "%r is bytes, not unicode"%a
             assert isinstance(check, bytes), "%r is not bytes"%check
             assert a.encode('utf8') == check, "%s != %s"%(a,check)
         
@@ -488,7 +488,7 @@ class TestView(ClusterTestCase):
     
     def test_eval_reference(self):
         v = self.client[self.client.ids[0]]
-        v['g'] = range(5)
+        v['g'] = list(range(5))
         rg = pmod.Reference('g[0]')
         echo = lambda x:x
         self.assertEqual(v.apply_sync(echo, rg), 0)
@@ -501,7 +501,7 @@ class TestView(ClusterTestCase):
 
     def test_single_engine_map(self):
         e0 = self.client[self.client.ids[0]]
-        r = range(5)
+        r = list(range(5))
         check = [ -1*i for i in r ]
         result = e0.map_sync(lambda x: -1*x, r)
         self.assertEqual(result, check)
diff --git a/IPython/parallel/util.py b/IPython/parallel/util.py
index b0ca794..d8344e9 100644
--- a/IPython/parallel/util.py
+++ b/IPython/parallel/util.py
@@ -231,7 +231,7 @@ def _push(**ns):
     while tmp in user_ns:
         tmp = tmp + '_'
     try:
-        for name, value in iteritems(ns):
+        for name, value in ns.items():
             user_ns[tmp] = value
             exec("%s = %s" % (name, tmp), user_ns)
     finally:
@@ -241,7 +241,7 @@ def _push(**ns):
 def _pull(keys):
     """helper method for implementing `client.pull` via `client.apply`"""
     if isinstance(keys, (list,tuple, set)):
-        return map(lambda key: eval(key, globals()), keys)
+        return [eval(key, globals()) for key in keys]
     else:
         return eval(keys, globals())