From ee81bbeb65bccca826f7399e6278033c80014992 2008-07-22 04:51:21 From: Brian E Granger Date: 2008-07-22 04:51:21 Subject: [PATCH] Fixing more tests and examples after the task, map and @parallel work. --- diff --git a/IPython/frontend/tests/test_frontendbase.py b/IPython/frontend/tests/test_frontendbase.py index f74c43e..54c470d 100644 --- a/IPython/frontend/tests/test_frontendbase.py +++ b/IPython/frontend/tests/test_frontendbase.py @@ -83,7 +83,7 @@ class TestAsyncFrontendBase(unittest.TestCase): d.addCallback(self.checkBlockID, expected='TEST_ID') def test_blockID_added_to_failure(self): - block = "raise Exception()" + block = "raise Exception()" d = self.fb.execute(block,blockID='TEST_ID') d.addErrback(self.checkFailureID, expected='TEST_ID') diff --git a/IPython/kernel/task.py b/IPython/kernel/task.py index fc2ebea..9b6f0ae 100644 --- a/IPython/kernel/task.py +++ b/IPython/kernel/task.py @@ -476,7 +476,7 @@ class TaskResult(object): return self._ns def _setNS(self, v): - raise Exception("I am protected!") + raise Exception("the ns attribute cannot be changed") ns = property(_getNS, _setNS) diff --git a/IPython/kernel/tests/engineservicetest.py b/IPython/kernel/tests/engineservicetest.py index b4a6d21..7336acd 100644 --- a/IPython/kernel/tests/engineservicetest.py +++ b/IPython/kernel/tests/engineservicetest.py @@ -163,7 +163,6 @@ class IEngineCoreTestCase(object): try: import numpy except: - print 'no numpy, ', return a = numpy.random.random(1000) d = self.engine.push(dict(a=a)) diff --git a/IPython/kernel/tests/multienginetest.py b/IPython/kernel/tests/multienginetest.py index 30a2df7..10b690e 100644 --- a/IPython/kernel/tests/multienginetest.py +++ b/IPython/kernel/tests/multienginetest.py @@ -733,7 +733,7 @@ class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r, range(16))) return d - + def testScatterGatherNumpyNonblocking(self): try: import numpy @@ -749,17 +749,7 @@ class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: assert_array_equal(r, a)) return d - - def testMapNonblocking(self): - self.addEngine(4) - def f(x): - return x**2 - data = range(16) - d= self.multiengine.map(f, data, block=False) - d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) - d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data])) - return d - + def test_clear_pending_deferreds(self): self.addEngine(4) did_list = [] diff --git a/IPython/kernel/tests/test_multienginefc.py b/IPython/kernel/tests/test_multienginefc.py index 610ada1..f390992 100644 --- a/IPython/kernel/tests/test_multienginefc.py +++ b/IPython/kernel/tests/test_multienginefc.py @@ -26,9 +26,20 @@ try: from IPython.kernel.multienginefc import IFCSynchronousMultiEngine from IPython.kernel import multiengine as me from IPython.kernel.clientconnector import ClientConnector + from IPython.kernel.parallelfunction import ParallelFunction + from IPython.kernel.error import CompositeError + from IPython.kernel.util import printer except ImportError: pass else: + + def _raise_it(f): + try: + f.raiseException() + except CompositeError, e: + e.raise_exception() + + class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase): def setUp(self): @@ -68,3 +79,66 @@ else: d.addBoth(lambda _: self.controller.stopService()) dlist.append(d) return defer.DeferredList(dlist) + + def test_mapper(self): + self.addEngine(4) + m = self.multiengine.mapper() + self.assertEquals(m.multiengine,self.multiengine) + self.assertEquals(m.dist,'b') + self.assertEquals(m.targets,'all') + self.assertEquals(m.block,True) + + def test_map_default(self): + self.addEngine(4) + m = self.multiengine.mapper() + d = m.map(lambda x: 2*x, range(10)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10))) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_map_noblock(self): + self.addEngine(4) + m = self.multiengine.mapper(block=False) + d = m.map(lambda x: 2*x, range(10)) + d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_mapper_fail(self): + self.addEngine(4) + m = self.multiengine.mapper() + d = m.map(lambda x: 1/0, range(10)) + d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) + return d + + def test_parallel(self): + self.addEngine(4) + p = self.multiengine.parallel() + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 2*x + d = f(range(10)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_parallel_noblock(self): + self.addEngine(1) + p = self.multiengine.parallel(block=False) + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 2*x + d = f(range(10)) + d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_parallel_fail(self): + self.addEngine(4) + p = self.multiengine.parallel() + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 1/0 + d = f(range(10)) + d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) + return d \ No newline at end of file diff --git a/docs/examples/kernel/fetchparse.py b/docs/examples/kernel/fetchparse.py index 4c5d489..421bb25 100644 --- a/docs/examples/kernel/fetchparse.py +++ b/docs/examples/kernel/fetchparse.py @@ -53,7 +53,7 @@ class DistributedSpider(object): self.allLinks.append(url) if url.startswith(self.site): print ' ', url - self.linksWorking[url] = self.tc.run(client.Task('links = fetchAndParse(url)', pull=['links'], push={'url': url})) + self.linksWorking[url] = self.tc.run(client.StringTask('links = fetchAndParse(url)', pull=['links'], push={'url': url})) def onVisitDone(self, result, url): print url, ':' diff --git a/docs/examples/kernel/helloworld.py b/docs/examples/kernel/helloworld.py index a96aa34..ce18244 100644 --- a/docs/examples/kernel/helloworld.py +++ b/docs/examples/kernel/helloworld.py @@ -8,7 +8,7 @@ tc = client.TaskClient() mec = client.MultiEngineClient() mec.execute('import time') -hello_taskid = tc.run(client.Task('time.sleep(3) ; word = "Hello,"', pull=('word'))) -world_taskid = tc.run(client.Task('time.sleep(3) ; word = "World!"', pull=('word'))) +hello_taskid = tc.run(client.StringTask('time.sleep(3) ; word = "Hello,"', pull=('word'))) +world_taskid = tc.run(client.StringTask('time.sleep(3) ; word = "World!"', pull=('word'))) print "Submitted tasks:", hello_taskid, world_taskid print tc.get_task_result(hello_taskid,block=True).ns.word, tc.get_task_result(world_taskid,block=True).ns.word diff --git a/docs/examples/kernel/task1.py b/docs/examples/kernel/task1.py index 7206faf..34d989d 100644 --- a/docs/examples/kernel/task1.py +++ b/docs/examples/kernel/task1.py @@ -14,5 +14,5 @@ c = a*b*d t1 = client.StringTask(cmd1, clear_before=False, clear_after=True, pull=['a','b','c']) tid1 = tc.run(t1) tr1 = tc.get_task_result(tid1,block=True) -tr1.raiseException() +tr1.raise_exception() print "a, b: ", tr1.ns.a, tr1.ns.b \ No newline at end of file diff --git a/docs/source/changes.txt b/docs/source/changes.txt index 127e261..03bbf91 100644 --- a/docs/source/changes.txt +++ b/docs/source/changes.txt @@ -12,6 +12,22 @@ Release 0.9 New features ------------ + * The notion of a task has been completely reworked. An `ITask` interface has + been created. This interface defines the methods that tasks need to implement. + These methods are now responsible for things like submitting tasks and processing + results. There are two basic task types: :class:`IPython.kernel.task.StringTask` + (this is the old `Task` object, but renamed) and the new + :class:`IPython.kernel.task.MapTask`, which is based on a function. + * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to + standardize the idea of a `map` method. This interface has a single + `map` method that has the same syntax as the built-in `map`. We have also defined + a `mapper` factory interface that creates objects that implement + :class:`IPython.kernel.mapper.IMapper` for different controllers. Both + the multiengine and task controller now have mapping capabilties. + * The parallel function capabilities have been reworks. The major changes are that + i) there is now an `@parallel` magic that creates parallel functions, ii) + the syntax for mulitple variable follows that of `map`, iii) both the + multiengine and task controller now have a parallel function implementation. * All of the parallel computing capabilities from `ipython1-dev` have been merged into IPython proper. This resulted in the following new subpackages: :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`, @@ -38,11 +54,11 @@ New features when ipcluster is able to start things on other hosts, we will put security back. - - Bug fixes --------- + * The colors escapes in the multiengine client are now turned off on win32 as they + don't print correctly. * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing mpi_import_statement incorrectly, which was leading the engine to crash when mpi was enabled. * A few subpackages has missing `__init__.py` files. @@ -52,6 +68,8 @@ Bug fixes Backwards incompatible changes ------------------------------ + * :class:`IPython.kernel.client.Task` has been renamed + :class:`IPython.kernel.client.StringTask` to make way for new task types. * The keyword argument `style` has been renamed `dist` in `scatter`, `gather` and `map`. * Renamed the values that the rename `dist` keyword argument can have from