##// END OF EJS Templates
add tests for parallel magics...
MinRK -
Show More
@@ -1,276 +1,290 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Magic command interface for interactive parallel work."""
4 """Magic command interface for interactive parallel work."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import ast
17 import ast
18 import re
18 import re
19
19
20 from IPython.core.plugin import Plugin
20 from IPython.core.plugin import Plugin
21 from IPython.utils.traitlets import Bool, Any, Instance
21 from IPython.utils.traitlets import Bool, Any, Instance
22 from IPython.testing import decorators as testdec
22 from IPython.testing import decorators as testdec
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Definitions of magic functions for use with IPython
25 # Definitions of magic functions for use with IPython
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28
28
29 NO_ACTIVE_VIEW = """
29 NO_ACTIVE_VIEW = """
30 Use activate() on a DirectView object to activate it for magics.
30 Use activate() on a DirectView object to activate it for magics.
31 """
31 """
32
32
33
33
34 class ParalleMagic(Plugin):
34 class ParalleMagic(Plugin):
35 """A component to manage the %result, %px and %autopx magics."""
35 """A component to manage the %result, %px and %autopx magics."""
36
36
37 active_view = Any()
37 active_view = Instance('IPython.parallel.client.view.DirectView')
38 verbose = Bool(False, config=True)
38 verbose = Bool(False, config=True)
39 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
39 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
40
40
41 def __init__(self, shell=None, config=None):
41 def __init__(self, shell=None, config=None):
42 super(ParalleMagic, self).__init__(shell=shell, config=config)
42 super(ParalleMagic, self).__init__(shell=shell, config=config)
43 self._define_magics()
43 self._define_magics()
44 # A flag showing if autopx is activated or not
44 # A flag showing if autopx is activated or not
45 self.autopx = False
45 self.autopx = False
46
46
47 def _define_magics(self):
47 def _define_magics(self):
48 """Define the magic functions."""
48 """Define the magic functions."""
49 self.shell.define_magic('result', self.magic_result)
49 self.shell.define_magic('result', self.magic_result)
50 self.shell.define_magic('px', self.magic_px)
50 self.shell.define_magic('px', self.magic_px)
51 self.shell.define_magic('autopx', self.magic_autopx)
51 self.shell.define_magic('autopx', self.magic_autopx)
52
52
53 @testdec.skip_doctest
53 @testdec.skip_doctest
54 def magic_result(self, ipself, parameter_s=''):
54 def magic_result(self, ipself, parameter_s=''):
55 """Print the result of command i on all engines..
55 """Print the result of command i on all engines..
56
56
57 To use this a :class:`DirectView` instance must be created
57 To use this a :class:`DirectView` instance must be created
58 and then activated by calling its :meth:`activate` method.
58 and then activated by calling its :meth:`activate` method.
59
59
60 Then you can do the following::
60 Then you can do the following::
61
61
62 In [23]: %result
62 In [23]: %result
63 Out[23]:
63 Out[23]:
64 <Results List>
64 <Results List>
65 [0] In [6]: a = 10
65 [0] In [6]: a = 10
66 [1] In [6]: a = 10
66 [1] In [6]: a = 10
67
67
68 In [22]: %result 6
68 In [22]: %result 6
69 Out[22]:
69 Out[22]:
70 <Results List>
70 <Results List>
71 [0] In [6]: a = 10
71 [0] In [6]: a = 10
72 [1] In [6]: a = 10
72 [1] In [6]: a = 10
73 """
73 """
74 if self.active_view is None:
74 if self.active_view is None:
75 print NO_ACTIVE_VIEW
75 print NO_ACTIVE_VIEW
76 return
76 return
77
77
78 try:
78 try:
79 index = int(parameter_s)
79 index = int(parameter_s)
80 except:
80 except:
81 index = None
81 index = None
82 result = self.active_view.get_result(index)
82 result = self.active_view.get_result(index)
83 return result
83 return result
84
84
85 @testdec.skip_doctest
85 @testdec.skip_doctest
86 def magic_px(self, ipself, parameter_s=''):
86 def magic_px(self, ipself, parameter_s=''):
87 """Executes the given python command in parallel.
87 """Executes the given python command in parallel.
88
88
89 To use this a :class:`DirectView` instance must be created
89 To use this a :class:`DirectView` instance must be created
90 and then activated by calling its :meth:`activate` method.
90 and then activated by calling its :meth:`activate` method.
91
91
92 Then you can do the following::
92 Then you can do the following::
93
93
94 In [24]: %px a = 5
94 In [24]: %px a = 5
95 Parallel execution on engines: all
95 Parallel execution on engines: all
96 Out[24]:
96 Out[24]:
97 <Results List>
97 <Results List>
98 [0] In [7]: a = 5
98 [0] In [7]: a = 5
99 [1] In [7]: a = 5
99 [1] In [7]: a = 5
100 """
100 """
101
101
102 if self.active_view is None:
102 if self.active_view is None:
103 print NO_ACTIVE_VIEW
103 print NO_ACTIVE_VIEW
104 return
104 return
105 print "Parallel execution on engines: %s" % self.active_view.targets
105 print "Parallel execution on engines: %s" % self.active_view.targets
106 result = self.active_view.execute(parameter_s)
106 result = self.active_view.execute(parameter_s, block=False)
107 return result
107 if self.active_view.block:
108 result.get()
109 self._maybe_display_output(result)
108
110
109 @testdec.skip_doctest
111 @testdec.skip_doctest
110 def magic_autopx(self, ipself, parameter_s=''):
112 def magic_autopx(self, ipself, parameter_s=''):
111 """Toggles auto parallel mode.
113 """Toggles auto parallel mode.
112
114
113 To use this a :class:`DirectView` instance must be created
115 To use this a :class:`DirectView` instance must be created
114 and then activated by calling its :meth:`activate` method. Once this
116 and then activated by calling its :meth:`activate` method. Once this
115 is called, all commands typed at the command line are send to
117 is called, all commands typed at the command line are send to
116 the engines to be executed in parallel. To control which engine
118 the engines to be executed in parallel. To control which engine
117 are used, set the ``targets`` attributed of the multiengine client
119 are used, set the ``targets`` attributed of the multiengine client
118 before entering ``%autopx`` mode.
120 before entering ``%autopx`` mode.
119
121
120 Then you can do the following::
122 Then you can do the following::
121
123
122 In [25]: %autopx
124 In [25]: %autopx
123 %autopx to enabled
125 %autopx to enabled
124
126
125 In [26]: a = 10
127 In [26]: a = 10
126 <Results List>
128 Parallel execution on engines: [0,1,2,3]
127 [0] In [8]: a = 10
129 In [27]: print a
128 [1] In [8]: a = 10
130 Parallel execution on engines: [0,1,2,3]
131 [stdout:0] 10
132 [stdout:1] 10
133 [stdout:2] 10
134 [stdout:3] 10
129
135
130
136
131 In [27]: %autopx
137 In [27]: %autopx
132 %autopx disabled
138 %autopx disabled
133 """
139 """
134 if self.autopx:
140 if self.autopx:
135 self._disable_autopx()
141 self._disable_autopx()
136 else:
142 else:
137 self._enable_autopx()
143 self._enable_autopx()
138
144
139 def _enable_autopx(self):
145 def _enable_autopx(self):
140 """Enable %autopx mode by saving the original run_cell and installing
146 """Enable %autopx mode by saving the original run_cell and installing
141 pxrun_cell.
147 pxrun_cell.
142 """
148 """
143 if self.active_view is None:
149 if self.active_view is None:
144 print NO_ACTIVE_VIEW
150 print NO_ACTIVE_VIEW
145 return
151 return
146
152
147 # override run_cell and run_code
153 # override run_cell and run_code
148 self._original_run_cell = self.shell.run_cell
154 self._original_run_cell = self.shell.run_cell
149 self.shell.run_cell = self.pxrun_cell
155 self.shell.run_cell = self.pxrun_cell
150 self._original_run_code = self.shell.run_code
156 self._original_run_code = self.shell.run_code
151 self.shell.run_code = self.pxrun_code
157 self.shell.run_code = self.pxrun_code
152
158
153 self.autopx = True
159 self.autopx = True
154 print "%autopx enabled"
160 print "%autopx enabled"
155
161
156 def _disable_autopx(self):
162 def _disable_autopx(self):
157 """Disable %autopx by restoring the original InteractiveShell.run_cell.
163 """Disable %autopx by restoring the original InteractiveShell.run_cell.
158 """
164 """
159 if self.autopx:
165 if self.autopx:
160 self.shell.run_cell = self._original_run_cell
166 self.shell.run_cell = self._original_run_cell
161 self.shell.run_code = self._original_run_code
167 self.shell.run_code = self._original_run_code
162 self.autopx = False
168 self.autopx = False
163 print "%autopx disabled"
169 print "%autopx disabled"
164
170
165 def _maybe_display_output(self, result):
171 def _maybe_display_output(self, result):
166 """Maybe display the output of a parallel result.
172 """Maybe display the output of a parallel result.
167
173
168 If self.active_view.block is True, wait for the result
174 If self.active_view.block is True, wait for the result
169 and display the result. Otherwise, this is a noop.
175 and display the result. Otherwise, this is a noop.
170 """
176 """
171 if self.active_view.block:
177 targets = self.active_view.targets
172 try:
178 if isinstance(targets, int):
173 result.get()
179 targets = [targets]
174 except:
180 if targets == 'all':
175 self.shell.showtraceback()
181 targets = self.active_view.client.ids
176 return True
182 stdout = [s.rstrip() for s in result.stdout]
177 else:
183 if any(stdout):
178 targets = self.active_view.targets
184 for i,eid in enumerate(targets):
179 if isinstance(targets, int):
185 print '[stdout:%i]'%eid, stdout[i]
180 targets = [targets]
181 if targets == 'all':
182 targets = self.active_view.client.ids
183 stdout = [s.rstrip() for s in result.stdout]
184 if any(stdout):
185 for i,eid in enumerate(targets):
186 print '[stdout:%i]'%eid, stdout[i]
187 return False
188
186
189
187
190 def pxrun_cell(self, cell, store_history=True):
188 def pxrun_cell(self, cell, store_history=True):
191 """drop-in replacement for InteractiveShell.run_cell.
189 """drop-in replacement for InteractiveShell.run_cell.
192
190
193 This executes code remotely, instead of in the local namespace.
191 This executes code remotely, instead of in the local namespace.
194
192
195 See InteractiveShell.run_cell for details.
193 See InteractiveShell.run_cell for details.
196 """
194 """
197 ipself = self.shell
195 ipself = self.shell
198 raw_cell = cell
196 raw_cell = cell
199 with ipself.builtin_trap:
197 with ipself.builtin_trap:
200 cell = ipself.prefilter_manager.prefilter_lines(cell)
198 cell = ipself.prefilter_manager.prefilter_lines(cell)
201
199
202 # Store raw and processed history
200 # Store raw and processed history
203 if store_history:
201 if store_history:
204 ipself.history_manager.store_inputs(ipself.execution_count,
202 ipself.history_manager.store_inputs(ipself.execution_count,
205 cell, raw_cell)
203 cell, raw_cell)
206
204
207 # ipself.logger.log(cell, raw_cell)
205 # ipself.logger.log(cell, raw_cell)
208
206
209 cell_name = ipself.compile.cache(cell, ipself.execution_count)
207 cell_name = ipself.compile.cache(cell, ipself.execution_count)
210
208
211 try:
209 try:
212 code_ast = ast.parse(cell, filename=cell_name)
210 code_ast = ast.parse(cell, filename=cell_name)
213 except (OverflowError, SyntaxError, ValueError, TypeError, MemoryError):
211 except (OverflowError, SyntaxError, ValueError, TypeError, MemoryError):
214 # Case 1
212 # Case 1
215 ipself.showsyntaxerror()
213 ipself.showsyntaxerror()
216 ipself.execution_count += 1
214 ipself.execution_count += 1
217 return None
215 return None
218 except NameError:
216 except NameError:
219 # ignore name errors, because we don't know the remote keys
217 # ignore name errors, because we don't know the remote keys
220 pass
218 pass
221
219
222 if store_history:
220 if store_history:
223 # Write output to the database. Does nothing unless
221 # Write output to the database. Does nothing unless
224 # history output logging is enabled.
222 # history output logging is enabled.
225 ipself.history_manager.store_output(ipself.execution_count)
223 ipself.history_manager.store_output(ipself.execution_count)
226 # Each cell is a *single* input, regardless of how many lines it has
224 # Each cell is a *single* input, regardless of how many lines it has
227 ipself.execution_count += 1
225 ipself.execution_count += 1
228
226
229 if re.search(r'get_ipython\(\)\.magic\(u?"%?autopx', cell):
227 if re.search(r'get_ipython\(\)\.magic\(u?"%?autopx', cell):
230 self._disable_autopx()
228 self._disable_autopx()
231 return False
229 return False
232 else:
230 else:
233 try:
231 try:
234 result = self.active_view.execute(cell, block=False)
232 result = self.active_view.execute(cell, block=False)
235 except:
233 except:
236 ipself.showtraceback()
234 ipself.showtraceback()
237 return False
235 return True
238 else:
236 else:
239 return self._maybe_display_output(result)
237 if self.active_view.block:
238 try:
239 result.get()
240 except:
241 self.shell.showtraceback()
242 return True
243 else:
244 self._maybe_display_output(result)
245 return False
240
246
241 def pxrun_code(self, code_obj, post_execute=True):
247 def pxrun_code(self, code_obj, post_execute=True):
242 """drop-in replacement for InteractiveShell.run_code.
248 """drop-in replacement for InteractiveShell.run_code.
243
249
244 This executes code remotely, instead of in the local namespace.
250 This executes code remotely, instead of in the local namespace.
245
251
246 See InteractiveShell.run_code for details.
252 See InteractiveShell.run_code for details.
247 """
253 """
248 ipself = self.shell
254 ipself = self.shell
249 # check code object for the autopx magic
255 # check code object for the autopx magic
250 if 'get_ipython' in code_obj.co_names and 'magic' in code_obj.co_names and \
256 if 'get_ipython' in code_obj.co_names and 'magic' in code_obj.co_names and \
251 any( [ isinstance(c, basestring) and 'autopx' in c for c in code_obj.co_consts ]):
257 any( [ isinstance(c, basestring) and 'autopx' in c for c in code_obj.co_consts ]):
252 self._disable_autopx()
258 self._disable_autopx()
253 return False
259 return False
254 else:
260 else:
255 try:
261 try:
256 result = self.active_view.execute(code_obj, block=False)
262 result = self.active_view.execute(code_obj, block=False)
257 except:
263 except:
258 ipself.showtraceback()
264 ipself.showtraceback()
259 return False
265 return True
260 else:
266 else:
261 return self._maybe_display_output(result)
267 if self.active_view.block:
268 try:
269 result.get()
270 except:
271 self.shell.showtraceback()
272 return True
273 else:
274 self._maybe_display_output(result)
275 return False
262
276
263
277
264
278
265
279
266 _loaded = False
280 _loaded = False
267
281
268
282
269 def load_ipython_extension(ip):
283 def load_ipython_extension(ip):
270 """Load the extension in IPython."""
284 """Load the extension in IPython."""
271 global _loaded
285 global _loaded
272 if not _loaded:
286 if not _loaded:
273 plugin = ParalleMagic(shell=ip, config=ip.config)
287 plugin = ParalleMagic(shell=ip, config=ip.config)
274 ip.plugin_manager.register_plugin('parallelmagic', plugin)
288 ip.plugin_manager.register_plugin('parallelmagic', plugin)
275 _loaded = True
289 _loaded = True
276
290
@@ -1,302 +1,413 b''
1 """test View objects"""
1 """test View objects"""
2 #-------------------------------------------------------------------------------
2 #-------------------------------------------------------------------------------
3 # Copyright (C) 2011 The IPython Development Team
3 # Copyright (C) 2011 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-------------------------------------------------------------------------------
7 #-------------------------------------------------------------------------------
8
8
9 #-------------------------------------------------------------------------------
9 #-------------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12
12
13 import sys
13 import time
14 import time
14 from tempfile import mktemp
15 from tempfile import mktemp
16 from StringIO import StringIO
15
17
16 import zmq
18 import zmq
17
19
18 from IPython import parallel as pmod
20 from IPython import parallel as pmod
19 from IPython.parallel import error
21 from IPython.parallel import error
20 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
22 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
21 from IPython.parallel import LoadBalancedView, DirectView
23 from IPython.parallel import LoadBalancedView, DirectView
22 from IPython.parallel.util import interactive
24 from IPython.parallel.util import interactive
23
25
24 from IPython.parallel.tests import add_engines
26 from IPython.parallel.tests import add_engines
25
27
26 from .clienttest import ClusterTestCase, segfault, wait, skip_without
28 from .clienttest import ClusterTestCase, segfault, wait, skip_without
27
29
28 def setup():
30 def setup():
29 add_engines(3)
31 add_engines(3)
30
32
31 class TestView(ClusterTestCase):
33 class TestView(ClusterTestCase):
32
34
33 def test_segfault_task(self):
35 def test_segfault_task(self):
34 """test graceful handling of engine death (balanced)"""
36 """test graceful handling of engine death (balanced)"""
35 # self.add_engines(1)
37 # self.add_engines(1)
36 ar = self.client[-1].apply_async(segfault)
38 ar = self.client[-1].apply_async(segfault)
37 self.assertRaisesRemote(error.EngineError, ar.get)
39 self.assertRaisesRemote(error.EngineError, ar.get)
38 eid = ar.engine_id
40 eid = ar.engine_id
39 while eid in self.client.ids:
41 while eid in self.client.ids:
40 time.sleep(.01)
42 time.sleep(.01)
41 self.client.spin()
43 self.client.spin()
42
44
43 def test_segfault_mux(self):
45 def test_segfault_mux(self):
44 """test graceful handling of engine death (direct)"""
46 """test graceful handling of engine death (direct)"""
45 # self.add_engines(1)
47 # self.add_engines(1)
46 eid = self.client.ids[-1]
48 eid = self.client.ids[-1]
47 ar = self.client[eid].apply_async(segfault)
49 ar = self.client[eid].apply_async(segfault)
48 self.assertRaisesRemote(error.EngineError, ar.get)
50 self.assertRaisesRemote(error.EngineError, ar.get)
49 eid = ar.engine_id
51 eid = ar.engine_id
50 while eid in self.client.ids:
52 while eid in self.client.ids:
51 time.sleep(.01)
53 time.sleep(.01)
52 self.client.spin()
54 self.client.spin()
53
55
54 def test_push_pull(self):
56 def test_push_pull(self):
55 """test pushing and pulling"""
57 """test pushing and pulling"""
56 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
58 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
57 t = self.client.ids[-1]
59 t = self.client.ids[-1]
58 v = self.client[t]
60 v = self.client[t]
59 push = v.push
61 push = v.push
60 pull = v.pull
62 pull = v.pull
61 v.block=True
63 v.block=True
62 nengines = len(self.client)
64 nengines = len(self.client)
63 push({'data':data})
65 push({'data':data})
64 d = pull('data')
66 d = pull('data')
65 self.assertEquals(d, data)
67 self.assertEquals(d, data)
66 self.client[:].push({'data':data})
68 self.client[:].push({'data':data})
67 d = self.client[:].pull('data', block=True)
69 d = self.client[:].pull('data', block=True)
68 self.assertEquals(d, nengines*[data])
70 self.assertEquals(d, nengines*[data])
69 ar = push({'data':data}, block=False)
71 ar = push({'data':data}, block=False)
70 self.assertTrue(isinstance(ar, AsyncResult))
72 self.assertTrue(isinstance(ar, AsyncResult))
71 r = ar.get()
73 r = ar.get()
72 ar = self.client[:].pull('data', block=False)
74 ar = self.client[:].pull('data', block=False)
73 self.assertTrue(isinstance(ar, AsyncResult))
75 self.assertTrue(isinstance(ar, AsyncResult))
74 r = ar.get()
76 r = ar.get()
75 self.assertEquals(r, nengines*[data])
77 self.assertEquals(r, nengines*[data])
76 self.client[:].push(dict(a=10,b=20))
78 self.client[:].push(dict(a=10,b=20))
77 r = self.client[:].pull(('a','b'), block=True)
79 r = self.client[:].pull(('a','b'), block=True)
78 self.assertEquals(r, nengines*[[10,20]])
80 self.assertEquals(r, nengines*[[10,20]])
79
81
80 def test_push_pull_function(self):
82 def test_push_pull_function(self):
81 "test pushing and pulling functions"
83 "test pushing and pulling functions"
82 def testf(x):
84 def testf(x):
83 return 2.0*x
85 return 2.0*x
84
86
85 t = self.client.ids[-1]
87 t = self.client.ids[-1]
86 v = self.client[t]
88 v = self.client[t]
87 v.block=True
89 v.block=True
88 push = v.push
90 push = v.push
89 pull = v.pull
91 pull = v.pull
90 execute = v.execute
92 execute = v.execute
91 push({'testf':testf})
93 push({'testf':testf})
92 r = pull('testf')
94 r = pull('testf')
93 self.assertEqual(r(1.0), testf(1.0))
95 self.assertEqual(r(1.0), testf(1.0))
94 execute('r = testf(10)')
96 execute('r = testf(10)')
95 r = pull('r')
97 r = pull('r')
96 self.assertEquals(r, testf(10))
98 self.assertEquals(r, testf(10))
97 ar = self.client[:].push({'testf':testf}, block=False)
99 ar = self.client[:].push({'testf':testf}, block=False)
98 ar.get()
100 ar.get()
99 ar = self.client[:].pull('testf', block=False)
101 ar = self.client[:].pull('testf', block=False)
100 rlist = ar.get()
102 rlist = ar.get()
101 for r in rlist:
103 for r in rlist:
102 self.assertEqual(r(1.0), testf(1.0))
104 self.assertEqual(r(1.0), testf(1.0))
103 execute("def g(x): return x*x")
105 execute("def g(x): return x*x")
104 r = pull(('testf','g'))
106 r = pull(('testf','g'))
105 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
107 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
106
108
107 def test_push_function_globals(self):
109 def test_push_function_globals(self):
108 """test that pushed functions have access to globals"""
110 """test that pushed functions have access to globals"""
109 @interactive
111 @interactive
110 def geta():
112 def geta():
111 return a
113 return a
112 # self.add_engines(1)
114 # self.add_engines(1)
113 v = self.client[-1]
115 v = self.client[-1]
114 v.block=True
116 v.block=True
115 v['f'] = geta
117 v['f'] = geta
116 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
118 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
117 v.execute('a=5')
119 v.execute('a=5')
118 v.execute('b=f()')
120 v.execute('b=f()')
119 self.assertEquals(v['b'], 5)
121 self.assertEquals(v['b'], 5)
120
122
121 def test_push_function_defaults(self):
123 def test_push_function_defaults(self):
122 """test that pushed functions preserve default args"""
124 """test that pushed functions preserve default args"""
123 def echo(a=10):
125 def echo(a=10):
124 return a
126 return a
125 v = self.client[-1]
127 v = self.client[-1]
126 v.block=True
128 v.block=True
127 v['f'] = echo
129 v['f'] = echo
128 v.execute('b=f()')
130 v.execute('b=f()')
129 self.assertEquals(v['b'], 10)
131 self.assertEquals(v['b'], 10)
130
132
131 def test_get_result(self):
133 def test_get_result(self):
132 """test getting results from the Hub."""
134 """test getting results from the Hub."""
133 c = pmod.Client(profile='iptest')
135 c = pmod.Client(profile='iptest')
134 # self.add_engines(1)
136 # self.add_engines(1)
135 t = c.ids[-1]
137 t = c.ids[-1]
136 v = c[t]
138 v = c[t]
137 v2 = self.client[t]
139 v2 = self.client[t]
138 ar = v.apply_async(wait, 1)
140 ar = v.apply_async(wait, 1)
139 # give the monitor time to notice the message
141 # give the monitor time to notice the message
140 time.sleep(.25)
142 time.sleep(.25)
141 ahr = v2.get_result(ar.msg_ids)
143 ahr = v2.get_result(ar.msg_ids)
142 self.assertTrue(isinstance(ahr, AsyncHubResult))
144 self.assertTrue(isinstance(ahr, AsyncHubResult))
143 self.assertEquals(ahr.get(), ar.get())
145 self.assertEquals(ahr.get(), ar.get())
144 ar2 = v2.get_result(ar.msg_ids)
146 ar2 = v2.get_result(ar.msg_ids)
145 self.assertFalse(isinstance(ar2, AsyncHubResult))
147 self.assertFalse(isinstance(ar2, AsyncHubResult))
146 c.spin()
148 c.spin()
147 c.close()
149 c.close()
148
150
149 def test_run_newline(self):
151 def test_run_newline(self):
150 """test that run appends newline to files"""
152 """test that run appends newline to files"""
151 tmpfile = mktemp()
153 tmpfile = mktemp()
152 with open(tmpfile, 'w') as f:
154 with open(tmpfile, 'w') as f:
153 f.write("""def g():
155 f.write("""def g():
154 return 5
156 return 5
155 """)
157 """)
156 v = self.client[-1]
158 v = self.client[-1]
157 v.run(tmpfile, block=True)
159 v.run(tmpfile, block=True)
158 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
160 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
159
161
160 def test_apply_tracked(self):
162 def test_apply_tracked(self):
161 """test tracking for apply"""
163 """test tracking for apply"""
162 # self.add_engines(1)
164 # self.add_engines(1)
163 t = self.client.ids[-1]
165 t = self.client.ids[-1]
164 v = self.client[t]
166 v = self.client[t]
165 v.block=False
167 v.block=False
166 def echo(n=1024*1024, **kwargs):
168 def echo(n=1024*1024, **kwargs):
167 with v.temp_flags(**kwargs):
169 with v.temp_flags(**kwargs):
168 return v.apply(lambda x: x, 'x'*n)
170 return v.apply(lambda x: x, 'x'*n)
169 ar = echo(1, track=False)
171 ar = echo(1, track=False)
170 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
172 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
171 self.assertTrue(ar.sent)
173 self.assertTrue(ar.sent)
172 ar = echo(track=True)
174 ar = echo(track=True)
173 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
175 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
174 self.assertEquals(ar.sent, ar._tracker.done)
176 self.assertEquals(ar.sent, ar._tracker.done)
175 ar._tracker.wait()
177 ar._tracker.wait()
176 self.assertTrue(ar.sent)
178 self.assertTrue(ar.sent)
177
179
178 def test_push_tracked(self):
180 def test_push_tracked(self):
179 t = self.client.ids[-1]
181 t = self.client.ids[-1]
180 ns = dict(x='x'*1024*1024)
182 ns = dict(x='x'*1024*1024)
181 v = self.client[t]
183 v = self.client[t]
182 ar = v.push(ns, block=False, track=False)
184 ar = v.push(ns, block=False, track=False)
183 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
184 self.assertTrue(ar.sent)
186 self.assertTrue(ar.sent)
185
187
186 ar = v.push(ns, block=False, track=True)
188 ar = v.push(ns, block=False, track=True)
187 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertEquals(ar.sent, ar._tracker.done)
190 self.assertEquals(ar.sent, ar._tracker.done)
189 ar._tracker.wait()
191 ar._tracker.wait()
190 self.assertTrue(ar.sent)
192 self.assertTrue(ar.sent)
191 ar.get()
193 ar.get()
192
194
193 def test_scatter_tracked(self):
195 def test_scatter_tracked(self):
194 t = self.client.ids
196 t = self.client.ids
195 x='x'*1024*1024
197 x='x'*1024*1024
196 ar = self.client[t].scatter('x', x, block=False, track=False)
198 ar = self.client[t].scatter('x', x, block=False, track=False)
197 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
198 self.assertTrue(ar.sent)
200 self.assertTrue(ar.sent)
199
201
200 ar = self.client[t].scatter('x', x, block=False, track=True)
202 ar = self.client[t].scatter('x', x, block=False, track=True)
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertEquals(ar.sent, ar._tracker.done)
204 self.assertEquals(ar.sent, ar._tracker.done)
203 ar._tracker.wait()
205 ar._tracker.wait()
204 self.assertTrue(ar.sent)
206 self.assertTrue(ar.sent)
205 ar.get()
207 ar.get()
206
208
207 def test_remote_reference(self):
209 def test_remote_reference(self):
208 v = self.client[-1]
210 v = self.client[-1]
209 v['a'] = 123
211 v['a'] = 123
210 ra = pmod.Reference('a')
212 ra = pmod.Reference('a')
211 b = v.apply_sync(lambda x: x, ra)
213 b = v.apply_sync(lambda x: x, ra)
212 self.assertEquals(b, 123)
214 self.assertEquals(b, 123)
213
215
214
216
215 def test_scatter_gather(self):
217 def test_scatter_gather(self):
216 view = self.client[:]
218 view = self.client[:]
217 seq1 = range(16)
219 seq1 = range(16)
218 view.scatter('a', seq1)
220 view.scatter('a', seq1)
219 seq2 = view.gather('a', block=True)
221 seq2 = view.gather('a', block=True)
220 self.assertEquals(seq2, seq1)
222 self.assertEquals(seq2, seq1)
221 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
223 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
222
224
223 @skip_without('numpy')
225 @skip_without('numpy')
224 def test_scatter_gather_numpy(self):
226 def test_scatter_gather_numpy(self):
225 import numpy
227 import numpy
226 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
228 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
227 view = self.client[:]
229 view = self.client[:]
228 a = numpy.arange(64)
230 a = numpy.arange(64)
229 view.scatter('a', a)
231 view.scatter('a', a)
230 b = view.gather('a', block=True)
232 b = view.gather('a', block=True)
231 assert_array_equal(b, a)
233 assert_array_equal(b, a)
232
234
233 def test_map(self):
235 def test_map(self):
234 view = self.client[:]
236 view = self.client[:]
235 def f(x):
237 def f(x):
236 return x**2
238 return x**2
237 data = range(16)
239 data = range(16)
238 r = view.map_sync(f, data)
240 r = view.map_sync(f, data)
239 self.assertEquals(r, map(f, data))
241 self.assertEquals(r, map(f, data))
240
242
241 def test_scatterGatherNonblocking(self):
243 def test_scatterGatherNonblocking(self):
242 data = range(16)
244 data = range(16)
243 view = self.client[:]
245 view = self.client[:]
244 view.scatter('a', data, block=False)
246 view.scatter('a', data, block=False)
245 ar = view.gather('a', block=False)
247 ar = view.gather('a', block=False)
246 self.assertEquals(ar.get(), data)
248 self.assertEquals(ar.get(), data)
247
249
248 @skip_without('numpy')
250 @skip_without('numpy')
249 def test_scatter_gather_numpy_nonblocking(self):
251 def test_scatter_gather_numpy_nonblocking(self):
250 import numpy
252 import numpy
251 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
253 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
252 a = numpy.arange(64)
254 a = numpy.arange(64)
253 view = self.client[:]
255 view = self.client[:]
254 ar = view.scatter('a', a, block=False)
256 ar = view.scatter('a', a, block=False)
255 self.assertTrue(isinstance(ar, AsyncResult))
257 self.assertTrue(isinstance(ar, AsyncResult))
256 amr = view.gather('a', block=False)
258 amr = view.gather('a', block=False)
257 self.assertTrue(isinstance(amr, AsyncMapResult))
259 self.assertTrue(isinstance(amr, AsyncMapResult))
258 assert_array_equal(amr.get(), a)
260 assert_array_equal(amr.get(), a)
259
261
260 def test_execute(self):
262 def test_execute(self):
261 view = self.client[:]
263 view = self.client[:]
262 # self.client.debug=True
264 # self.client.debug=True
263 execute = view.execute
265 execute = view.execute
264 ar = execute('c=30', block=False)
266 ar = execute('c=30', block=False)
265 self.assertTrue(isinstance(ar, AsyncResult))
267 self.assertTrue(isinstance(ar, AsyncResult))
266 ar = execute('d=[0,1,2]', block=False)
268 ar = execute('d=[0,1,2]', block=False)
267 self.client.wait(ar, 1)
269 self.client.wait(ar, 1)
268 self.assertEquals(len(ar.get()), len(self.client))
270 self.assertEquals(len(ar.get()), len(self.client))
269 for c in view['c']:
271 for c in view['c']:
270 self.assertEquals(c, 30)
272 self.assertEquals(c, 30)
271
273
272 def test_abort(self):
274 def test_abort(self):
273 view = self.client[-1]
275 view = self.client[-1]
274 ar = view.execute('import time; time.sleep(0.25)', block=False)
276 ar = view.execute('import time; time.sleep(0.25)', block=False)
275 ar2 = view.apply_async(lambda : 2)
277 ar2 = view.apply_async(lambda : 2)
276 ar3 = view.apply_async(lambda : 3)
278 ar3 = view.apply_async(lambda : 3)
277 view.abort(ar2)
279 view.abort(ar2)
278 view.abort(ar3.msg_ids)
280 view.abort(ar3.msg_ids)
279 self.assertRaises(error.TaskAborted, ar2.get)
281 self.assertRaises(error.TaskAborted, ar2.get)
280 self.assertRaises(error.TaskAborted, ar3.get)
282 self.assertRaises(error.TaskAborted, ar3.get)
281
283
282 def test_temp_flags(self):
284 def test_temp_flags(self):
283 view = self.client[-1]
285 view = self.client[-1]
284 view.block=True
286 view.block=True
285 with view.temp_flags(block=False):
287 with view.temp_flags(block=False):
286 self.assertFalse(view.block)
288 self.assertFalse(view.block)
287 self.assertTrue(view.block)
289 self.assertTrue(view.block)
288
290
289 def test_importer(self):
291 def test_importer(self):
290 view = self.client[-1]
292 view = self.client[-1]
291 view.clear(block=True)
293 view.clear(block=True)
292 with view.importer:
294 with view.importer:
293 import re
295 import re
294
296
295 @interactive
297 @interactive
296 def findall(pat, s):
298 def findall(pat, s):
297 # this globals() step isn't necessary in real code
299 # this globals() step isn't necessary in real code
298 # only to prevent a closure in the test
300 # only to prevent a closure in the test
299 return globals()['re'].findall(pat, s)
301 return globals()['re'].findall(pat, s)
300
302
301 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
303 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
302
304
305 # parallel magic tests
306
307 def test_magic_px_blocking(self):
308 ip = get_ipython()
309 v = self.client[-1]
310 v.activate()
311 v.block=True
312
313 ip.magic_px('a=5')
314 self.assertEquals(v['a'], 5)
315 ip.magic_px('a=10')
316 self.assertEquals(v['a'], 10)
317 sio = StringIO()
318 savestdout = sys.stdout
319 sys.stdout = sio
320 ip.magic_px('print a')
321 sys.stdout = savestdout
322 sio.read()
323 self.assertTrue('[stdout:%i]'%v.targets in sio.buf)
324 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
325
326 def test_magic_px_nonblocking(self):
327 ip = get_ipython()
328 v = self.client[-1]
329 v.activate()
330 v.block=False
331
332 ip.magic_px('a=5')
333 self.assertEquals(v['a'], 5)
334 ip.magic_px('a=10')
335 self.assertEquals(v['a'], 10)
336 sio = StringIO()
337 savestdout = sys.stdout
338 sys.stdout = sio
339 ip.magic_px('print a')
340 sys.stdout = savestdout
341 sio.read()
342 self.assertFalse('[stdout:%i]'%v.targets in sio.buf)
343 ip.magic_px('1/0')
344 ar = v.get_result(-1)
345 self.assertRaisesRemote(ZeroDivisionError, ar.get)
346
347 def test_magic_autopx_blocking(self):
348 ip = get_ipython()
349 v = self.client[-1]
350 v.activate()
351 v.block=True
352
353 sio = StringIO()
354 savestdout = sys.stdout
355 sys.stdout = sio
356 ip.magic_autopx()
357 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
358 ip.run_cell('print b')
359 ip.run_cell("b/c")
360 ip.run_code(compile('b*=2', '', 'single'))
361 ip.magic_autopx()
362 sys.stdout = savestdout
363 sio.read()
364 output = sio.buf.strip()
365 self.assertTrue(output.startswith('%autopx enabled'))
366 self.assertTrue(output.endswith('%autopx disabled'))
367 self.assertTrue('RemoteError: ZeroDivisionError' in output)
368 ar = v.get_result(-2)
369 self.assertEquals(v['a'], 5)
370 self.assertEquals(v['b'], 20)
371 self.assertRaisesRemote(ZeroDivisionError, ar.get)
372
373 def test_magic_autopx_nonblocking(self):
374 ip = get_ipython()
375 v = self.client[-1]
376 v.activate()
377 v.block=False
378
379 sio = StringIO()
380 savestdout = sys.stdout
381 sys.stdout = sio
382 ip.magic_autopx()
383 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
384 ip.run_cell('print b')
385 ip.run_cell("b/c")
386 ip.run_code(compile('b*=2', '', 'single'))
387 ip.magic_autopx()
388 sys.stdout = savestdout
389 sio.read()
390 output = sio.buf.strip()
391 self.assertTrue(output.startswith('%autopx enabled'))
392 self.assertTrue(output.endswith('%autopx disabled'))
393 self.assertFalse('ZeroDivisionError' in output)
394 ar = v.get_result(-2)
395 self.assertEquals(v['a'], 5)
396 self.assertEquals(v['b'], 20)
397 self.assertRaisesRemote(ZeroDivisionError, ar.get)
398
399 def test_magic_result(self):
400 ip = get_ipython()
401 v = self.client[-1]
402 v.activate()
403 v['a'] = 111
404 ra = v['a']
405
406 ar = ip.magic_result()
407 self.assertEquals(ar.msg_ids, [v.history[-1]])
408 self.assertEquals(ar.get(), 111)
409 ar = ip.magic_result('-2')
410 self.assertEquals(ar.msg_ids, [v.history[-2]])
411
412
413
General Comments 0
You need to be logged in to leave comments. Login now