##// END OF EJS Templates
Fixing more tests and examples after the task, map and @parallel work.
Brian E Granger -
Show More
@@ -83,7 +83,7 class TestAsyncFrontendBase(unittest.TestCase):
83 83 d.addCallback(self.checkBlockID, expected='TEST_ID')
84 84
85 85 def test_blockID_added_to_failure(self):
86 block = "raise Exception()"
86 block = "raise Exception()"
87 87
88 88 d = self.fb.execute(block,blockID='TEST_ID')
89 89 d.addErrback(self.checkFailureID, expected='TEST_ID')
@@ -476,7 +476,7 class TaskResult(object):
476 476 return self._ns
477 477
478 478 def _setNS(self, v):
479 raise Exception("I am protected!")
479 raise Exception("the ns attribute cannot be changed")
480 480
481 481 ns = property(_getNS, _setNS)
482 482
@@ -163,7 +163,6 class IEngineCoreTestCase(object):
163 163 try:
164 164 import numpy
165 165 except:
166 print 'no numpy, ',
167 166 return
168 167 a = numpy.random.random(1000)
169 168 d = self.engine.push(dict(a=a))
@@ -733,7 +733,7 class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase
733 733 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
734 734 d.addCallback(lambda r: self.assertEquals(r, range(16)))
735 735 return d
736
736
737 737 def testScatterGatherNumpyNonblocking(self):
738 738 try:
739 739 import numpy
@@ -749,17 +749,7 class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase
749 749 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
750 750 d.addCallback(lambda r: assert_array_equal(r, a))
751 751 return d
752
753 def testMapNonblocking(self):
754 self.addEngine(4)
755 def f(x):
756 return x**2
757 data = range(16)
758 d= self.multiengine.map(f, data, block=False)
759 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
760 d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data]))
761 return d
762
752
763 753 def test_clear_pending_deferreds(self):
764 754 self.addEngine(4)
765 755 did_list = []
@@ -26,9 +26,20 try:
26 26 from IPython.kernel.multienginefc import IFCSynchronousMultiEngine
27 27 from IPython.kernel import multiengine as me
28 28 from IPython.kernel.clientconnector import ClientConnector
29 from IPython.kernel.parallelfunction import ParallelFunction
30 from IPython.kernel.error import CompositeError
31 from IPython.kernel.util import printer
29 32 except ImportError:
30 33 pass
31 34 else:
35
36 def _raise_it(f):
37 try:
38 f.raiseException()
39 except CompositeError, e:
40 e.raise_exception()
41
42
32 43 class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase):
33 44
34 45 def setUp(self):
@@ -68,3 +79,66 else:
68 79 d.addBoth(lambda _: self.controller.stopService())
69 80 dlist.append(d)
70 81 return defer.DeferredList(dlist)
82
83 def test_mapper(self):
84 self.addEngine(4)
85 m = self.multiengine.mapper()
86 self.assertEquals(m.multiengine,self.multiengine)
87 self.assertEquals(m.dist,'b')
88 self.assertEquals(m.targets,'all')
89 self.assertEquals(m.block,True)
90
91 def test_map_default(self):
92 self.addEngine(4)
93 m = self.multiengine.mapper()
94 d = m.map(lambda x: 2*x, range(10))
95 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
96 d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10)))
97 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
98 return d
99
100 def test_map_noblock(self):
101 self.addEngine(4)
102 m = self.multiengine.mapper(block=False)
103 d = m.map(lambda x: 2*x, range(10))
104 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
105 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
106 return d
107
108 def test_mapper_fail(self):
109 self.addEngine(4)
110 m = self.multiengine.mapper()
111 d = m.map(lambda x: 1/0, range(10))
112 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
113 return d
114
115 def test_parallel(self):
116 self.addEngine(4)
117 p = self.multiengine.parallel()
118 self.assert_(isinstance(p, ParallelFunction))
119 @p
120 def f(x): return 2*x
121 d = f(range(10))
122 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
123 return d
124
125 def test_parallel_noblock(self):
126 self.addEngine(1)
127 p = self.multiengine.parallel(block=False)
128 self.assert_(isinstance(p, ParallelFunction))
129 @p
130 def f(x): return 2*x
131 d = f(range(10))
132 d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True))
133 d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)]))
134 return d
135
136 def test_parallel_fail(self):
137 self.addEngine(4)
138 p = self.multiengine.parallel()
139 self.assert_(isinstance(p, ParallelFunction))
140 @p
141 def f(x): return 1/0
142 d = f(range(10))
143 d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f))
144 return d No newline at end of file
@@ -53,7 +53,7 class DistributedSpider(object):
53 53 self.allLinks.append(url)
54 54 if url.startswith(self.site):
55 55 print ' ', url
56 self.linksWorking[url] = self.tc.run(client.Task('links = fetchAndParse(url)', pull=['links'], push={'url': url}))
56 self.linksWorking[url] = self.tc.run(client.StringTask('links = fetchAndParse(url)', pull=['links'], push={'url': url}))
57 57
58 58 def onVisitDone(self, result, url):
59 59 print url, ':'
@@ -8,7 +8,7 tc = client.TaskClient()
8 8 mec = client.MultiEngineClient()
9 9
10 10 mec.execute('import time')
11 hello_taskid = tc.run(client.Task('time.sleep(3) ; word = "Hello,"', pull=('word')))
12 world_taskid = tc.run(client.Task('time.sleep(3) ; word = "World!"', pull=('word')))
11 hello_taskid = tc.run(client.StringTask('time.sleep(3) ; word = "Hello,"', pull=('word')))
12 world_taskid = tc.run(client.StringTask('time.sleep(3) ; word = "World!"', pull=('word')))
13 13 print "Submitted tasks:", hello_taskid, world_taskid
14 14 print tc.get_task_result(hello_taskid,block=True).ns.word, tc.get_task_result(world_taskid,block=True).ns.word
@@ -14,5 +14,5 c = a*b*d
14 14 t1 = client.StringTask(cmd1, clear_before=False, clear_after=True, pull=['a','b','c'])
15 15 tid1 = tc.run(t1)
16 16 tr1 = tc.get_task_result(tid1,block=True)
17 tr1.raiseException()
17 tr1.raise_exception()
18 18 print "a, b: ", tr1.ns.a, tr1.ns.b No newline at end of file
@@ -12,6 +12,22 Release 0.9
12 12 New features
13 13 ------------
14 14
15 * The notion of a task has been completely reworked. An `ITask` interface has
16 been created. This interface defines the methods that tasks need to implement.
17 These methods are now responsible for things like submitting tasks and processing
18 results. There are two basic task types: :class:`IPython.kernel.task.StringTask`
19 (this is the old `Task` object, but renamed) and the new
20 :class:`IPython.kernel.task.MapTask`, which is based on a function.
21 * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to
22 standardize the idea of a `map` method. This interface has a single
23 `map` method that has the same syntax as the built-in `map`. We have also defined
24 a `mapper` factory interface that creates objects that implement
25 :class:`IPython.kernel.mapper.IMapper` for different controllers. Both
26 the multiengine and task controller now have mapping capabilties.
27 * The parallel function capabilities have been reworks. The major changes are that
28 i) there is now an `@parallel` magic that creates parallel functions, ii)
29 the syntax for mulitple variable follows that of `map`, iii) both the
30 multiengine and task controller now have a parallel function implementation.
15 31 * All of the parallel computing capabilities from `ipython1-dev` have been merged into
16 32 IPython proper. This resulted in the following new subpackages:
17 33 :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`,
@@ -38,11 +54,11 New features
38 54 when ipcluster is able to start things on other hosts, we will put security
39 55 back.
40 56
41
42
43 57 Bug fixes
44 58 ---------
45 59
60 * The colors escapes in the multiengine client are now turned off on win32 as they
61 don't print correctly.
46 62 * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing mpi_import_statement
47 63 incorrectly, which was leading the engine to crash when mpi was enabled.
48 64 * A few subpackages has missing `__init__.py` files.
@@ -52,6 +68,8 Bug fixes
52 68 Backwards incompatible changes
53 69 ------------------------------
54 70
71 * :class:`IPython.kernel.client.Task` has been renamed
72 :class:`IPython.kernel.client.StringTask` to make way for new task types.
55 73 * The keyword argument `style` has been renamed `dist` in `scatter`, `gather`
56 74 and `map`.
57 75 * Renamed the values that the rename `dist` keyword argument can have from
General Comments 0
You need to be logged in to leave comments. Login now