##// END OF EJS Templates
fix %px magic output for single target...
MinRK -
Show More
@@ -1,294 +1,300 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Magic command interface for interactive parallel work."""
4 """Magic command interface for interactive parallel work."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import ast
17 import ast
18 import re
18 import re
19
19
20 from IPython.core.plugin import Plugin
20 from IPython.core.plugin import Plugin
21 from IPython.utils.traitlets import Bool, Any, Instance
21 from IPython.utils.traitlets import Bool, Any, Instance
22 from IPython.testing.skipdoctest import skip_doctest
22 from IPython.testing.skipdoctest import skip_doctest
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Definitions of magic functions for use with IPython
25 # Definitions of magic functions for use with IPython
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28
28
29 NO_ACTIVE_VIEW = """
29 NO_ACTIVE_VIEW = """
30 Use activate() on a DirectView object to activate it for magics.
30 Use activate() on a DirectView object to activate it for magics.
31 """
31 """
32
32
33
33
34 class ParalleMagic(Plugin):
34 class ParalleMagic(Plugin):
35 """A component to manage the %result, %px and %autopx magics."""
35 """A component to manage the %result, %px and %autopx magics."""
36
36
37 active_view = Instance('IPython.parallel.client.view.DirectView')
37 active_view = Instance('IPython.parallel.client.view.DirectView')
38 verbose = Bool(False, config=True)
38 verbose = Bool(False, config=True)
39 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
39 shell = Instance('IPython.core.interactiveshell.InteractiveShellABC')
40
40
41 def __init__(self, shell=None, config=None):
41 def __init__(self, shell=None, config=None):
42 super(ParalleMagic, self).__init__(shell=shell, config=config)
42 super(ParalleMagic, self).__init__(shell=shell, config=config)
43 self._define_magics()
43 self._define_magics()
44 # A flag showing if autopx is activated or not
44 # A flag showing if autopx is activated or not
45 self.autopx = False
45 self.autopx = False
46
46
47 def _define_magics(self):
47 def _define_magics(self):
48 """Define the magic functions."""
48 """Define the magic functions."""
49 self.shell.define_magic('result', self.magic_result)
49 self.shell.define_magic('result', self.magic_result)
50 self.shell.define_magic('px', self.magic_px)
50 self.shell.define_magic('px', self.magic_px)
51 self.shell.define_magic('autopx', self.magic_autopx)
51 self.shell.define_magic('autopx', self.magic_autopx)
52
52
53 @skip_doctest
53 @skip_doctest
54 def magic_result(self, ipself, parameter_s=''):
54 def magic_result(self, ipself, parameter_s=''):
55 """Print the result of command i on all engines..
55 """Print the result of command i on all engines..
56
56
57 To use this a :class:`DirectView` instance must be created
57 To use this a :class:`DirectView` instance must be created
58 and then activated by calling its :meth:`activate` method.
58 and then activated by calling its :meth:`activate` method.
59
59
60 Then you can do the following::
60 Then you can do the following::
61
61
62 In [23]: %result
62 In [23]: %result
63 Out[23]:
63 Out[23]:
64 <Results List>
64 <Results List>
65 [0] In [6]: a = 10
65 [0] In [6]: a = 10
66 [1] In [6]: a = 10
66 [1] In [6]: a = 10
67
67
68 In [22]: %result 6
68 In [22]: %result 6
69 Out[22]:
69 Out[22]:
70 <Results List>
70 <Results List>
71 [0] In [6]: a = 10
71 [0] In [6]: a = 10
72 [1] In [6]: a = 10
72 [1] In [6]: a = 10
73 """
73 """
74 if self.active_view is None:
74 if self.active_view is None:
75 print NO_ACTIVE_VIEW
75 print NO_ACTIVE_VIEW
76 return
76 return
77
77
78 try:
78 try:
79 index = int(parameter_s)
79 index = int(parameter_s)
80 except:
80 except:
81 index = None
81 index = None
82 result = self.active_view.get_result(index)
82 result = self.active_view.get_result(index)
83 return result
83 return result
84
84
85 @skip_doctest
85 @skip_doctest
86 def magic_px(self, ipself, parameter_s=''):
86 def magic_px(self, ipself, parameter_s=''):
87 """Executes the given python command in parallel.
87 """Executes the given python command in parallel.
88
88
89 To use this a :class:`DirectView` instance must be created
89 To use this a :class:`DirectView` instance must be created
90 and then activated by calling its :meth:`activate` method.
90 and then activated by calling its :meth:`activate` method.
91
91
92 Then you can do the following::
92 Then you can do the following::
93
93
94 In [24]: %px a = 5
94 In [24]: %px a = 5
95 Parallel execution on engines: all
95 Parallel execution on engine(s): all
96 Out[24]:
96 Out[24]:
97 <Results List>
97 <Results List>
98 [0] In [7]: a = 5
98 [0] In [7]: a = 5
99 [1] In [7]: a = 5
99 [1] In [7]: a = 5
100 """
100 """
101
101
102 if self.active_view is None:
102 if self.active_view is None:
103 print NO_ACTIVE_VIEW
103 print NO_ACTIVE_VIEW
104 return
104 return
105 print "Parallel execution on engines: %s" % self.active_view.targets
105 print "Parallel execution on engine(s): %s" % self.active_view.targets
106 result = self.active_view.execute(parameter_s, block=False)
106 result = self.active_view.execute(parameter_s, block=False)
107 if self.active_view.block:
107 if self.active_view.block:
108 result.get()
108 result.get()
109 self._maybe_display_output(result)
109 self._maybe_display_output(result)
110
110
111 @skip_doctest
111 @skip_doctest
112 def magic_autopx(self, ipself, parameter_s=''):
112 def magic_autopx(self, ipself, parameter_s=''):
113 """Toggles auto parallel mode.
113 """Toggles auto parallel mode.
114
114
115 To use this a :class:`DirectView` instance must be created
115 To use this a :class:`DirectView` instance must be created
116 and then activated by calling its :meth:`activate` method. Once this
116 and then activated by calling its :meth:`activate` method. Once this
117 is called, all commands typed at the command line are send to
117 is called, all commands typed at the command line are send to
118 the engines to be executed in parallel. To control which engine
118 the engines to be executed in parallel. To control which engine
119 are used, set the ``targets`` attributed of the multiengine client
119 are used, set the ``targets`` attributed of the multiengine client
120 before entering ``%autopx`` mode.
120 before entering ``%autopx`` mode.
121
121
122 Then you can do the following::
122 Then you can do the following::
123
123
124 In [25]: %autopx
124 In [25]: %autopx
125 %autopx to enabled
125 %autopx to enabled
126
126
127 In [26]: a = 10
127 In [26]: a = 10
128 Parallel execution on engines: [0,1,2,3]
128 Parallel execution on engine(s): [0,1,2,3]
129 In [27]: print a
129 In [27]: print a
130 Parallel execution on engines: [0,1,2,3]
130 Parallel execution on engine(s): [0,1,2,3]
131 [stdout:0] 10
131 [stdout:0] 10
132 [stdout:1] 10
132 [stdout:1] 10
133 [stdout:2] 10
133 [stdout:2] 10
134 [stdout:3] 10
134 [stdout:3] 10
135
135
136
136
137 In [27]: %autopx
137 In [27]: %autopx
138 %autopx disabled
138 %autopx disabled
139 """
139 """
140 if self.autopx:
140 if self.autopx:
141 self._disable_autopx()
141 self._disable_autopx()
142 else:
142 else:
143 self._enable_autopx()
143 self._enable_autopx()
144
144
145 def _enable_autopx(self):
145 def _enable_autopx(self):
146 """Enable %autopx mode by saving the original run_cell and installing
146 """Enable %autopx mode by saving the original run_cell and installing
147 pxrun_cell.
147 pxrun_cell.
148 """
148 """
149 if self.active_view is None:
149 if self.active_view is None:
150 print NO_ACTIVE_VIEW
150 print NO_ACTIVE_VIEW
151 return
151 return
152
152
153 # override run_cell and run_code
153 # override run_cell and run_code
154 self._original_run_cell = self.shell.run_cell
154 self._original_run_cell = self.shell.run_cell
155 self.shell.run_cell = self.pxrun_cell
155 self.shell.run_cell = self.pxrun_cell
156 self._original_run_code = self.shell.run_code
156 self._original_run_code = self.shell.run_code
157 self.shell.run_code = self.pxrun_code
157 self.shell.run_code = self.pxrun_code
158
158
159 self.autopx = True
159 self.autopx = True
160 print "%autopx enabled"
160 print "%autopx enabled"
161
161
162 def _disable_autopx(self):
162 def _disable_autopx(self):
163 """Disable %autopx by restoring the original InteractiveShell.run_cell.
163 """Disable %autopx by restoring the original InteractiveShell.run_cell.
164 """
164 """
165 if self.autopx:
165 if self.autopx:
166 self.shell.run_cell = self._original_run_cell
166 self.shell.run_cell = self._original_run_cell
167 self.shell.run_code = self._original_run_code
167 self.shell.run_code = self._original_run_code
168 self.autopx = False
168 self.autopx = False
169 print "%autopx disabled"
169 print "%autopx disabled"
170
170
171 def _maybe_display_output(self, result):
171 def _maybe_display_output(self, result):
172 """Maybe display the output of a parallel result.
172 """Maybe display the output of a parallel result.
173
173
174 If self.active_view.block is True, wait for the result
174 If self.active_view.block is True, wait for the result
175 and display the result. Otherwise, this is a noop.
175 and display the result. Otherwise, this is a noop.
176 """
176 """
177 if isinstance(result.stdout, basestring):
178 # single result
179 stdouts = [result.stdout.rstrip()]
180 else:
181 stdouts = [s.rstrip() for s in result.stdout]
182
177 targets = self.active_view.targets
183 targets = self.active_view.targets
178 if isinstance(targets, int):
184 if isinstance(targets, int):
179 targets = [targets]
185 targets = [targets]
180 if targets == 'all':
186 elif targets == 'all':
181 targets = self.active_view.client.ids
187 targets = self.active_view.client.ids
182 stdout = [s.rstrip() for s in result.stdout]
188
183 if any(stdout):
189 if any(stdouts):
184 for i,eid in enumerate(targets):
190 for eid,stdout in zip(targets, stdouts):
185 print '[stdout:%i]'%eid, stdout[i]
191 print '[stdout:%i]'%eid, stdout
186
192
187
193
188 def pxrun_cell(self, raw_cell, store_history=True):
194 def pxrun_cell(self, raw_cell, store_history=True):
189 """drop-in replacement for InteractiveShell.run_cell.
195 """drop-in replacement for InteractiveShell.run_cell.
190
196
191 This executes code remotely, instead of in the local namespace.
197 This executes code remotely, instead of in the local namespace.
192
198
193 See InteractiveShell.run_cell for details.
199 See InteractiveShell.run_cell for details.
194 """
200 """
195
201
196 if (not raw_cell) or raw_cell.isspace():
202 if (not raw_cell) or raw_cell.isspace():
197 return
203 return
198
204
199 ipself = self.shell
205 ipself = self.shell
200
206
201 with ipself.builtin_trap:
207 with ipself.builtin_trap:
202 cell = ipself.prefilter_manager.prefilter_lines(raw_cell)
208 cell = ipself.prefilter_manager.prefilter_lines(raw_cell)
203
209
204 # Store raw and processed history
210 # Store raw and processed history
205 if store_history:
211 if store_history:
206 ipself.history_manager.store_inputs(ipself.execution_count,
212 ipself.history_manager.store_inputs(ipself.execution_count,
207 cell, raw_cell)
213 cell, raw_cell)
208
214
209 # ipself.logger.log(cell, raw_cell)
215 # ipself.logger.log(cell, raw_cell)
210
216
211 cell_name = ipself.compile.cache(cell, ipself.execution_count)
217 cell_name = ipself.compile.cache(cell, ipself.execution_count)
212
218
213 try:
219 try:
214 code_ast = ast.parse(cell, filename=cell_name)
220 code_ast = ast.parse(cell, filename=cell_name)
215 except (OverflowError, SyntaxError, ValueError, TypeError, MemoryError):
221 except (OverflowError, SyntaxError, ValueError, TypeError, MemoryError):
216 # Case 1
222 # Case 1
217 ipself.showsyntaxerror()
223 ipself.showsyntaxerror()
218 ipself.execution_count += 1
224 ipself.execution_count += 1
219 return None
225 return None
220 except NameError:
226 except NameError:
221 # ignore name errors, because we don't know the remote keys
227 # ignore name errors, because we don't know the remote keys
222 pass
228 pass
223
229
224 if store_history:
230 if store_history:
225 # Write output to the database. Does nothing unless
231 # Write output to the database. Does nothing unless
226 # history output logging is enabled.
232 # history output logging is enabled.
227 ipself.history_manager.store_output(ipself.execution_count)
233 ipself.history_manager.store_output(ipself.execution_count)
228 # Each cell is a *single* input, regardless of how many lines it has
234 # Each cell is a *single* input, regardless of how many lines it has
229 ipself.execution_count += 1
235 ipself.execution_count += 1
230
236
231 if re.search(r'get_ipython\(\)\.magic\(u?"%?autopx', cell):
237 if re.search(r'get_ipython\(\)\.magic\(u?"%?autopx', cell):
232 self._disable_autopx()
238 self._disable_autopx()
233 return False
239 return False
234 else:
240 else:
235 try:
241 try:
236 result = self.active_view.execute(cell, block=False)
242 result = self.active_view.execute(cell, block=False)
237 except:
243 except:
238 ipself.showtraceback()
244 ipself.showtraceback()
239 return True
245 return True
240 else:
246 else:
241 if self.active_view.block:
247 if self.active_view.block:
242 try:
248 try:
243 result.get()
249 result.get()
244 except:
250 except:
245 self.shell.showtraceback()
251 self.shell.showtraceback()
246 return True
252 return True
247 else:
253 else:
248 self._maybe_display_output(result)
254 self._maybe_display_output(result)
249 return False
255 return False
250
256
251 def pxrun_code(self, code_obj):
257 def pxrun_code(self, code_obj):
252 """drop-in replacement for InteractiveShell.run_code.
258 """drop-in replacement for InteractiveShell.run_code.
253
259
254 This executes code remotely, instead of in the local namespace.
260 This executes code remotely, instead of in the local namespace.
255
261
256 See InteractiveShell.run_code for details.
262 See InteractiveShell.run_code for details.
257 """
263 """
258 ipself = self.shell
264 ipself = self.shell
259 # check code object for the autopx magic
265 # check code object for the autopx magic
260 if 'get_ipython' in code_obj.co_names and 'magic' in code_obj.co_names and \
266 if 'get_ipython' in code_obj.co_names and 'magic' in code_obj.co_names and \
261 any( [ isinstance(c, basestring) and 'autopx' in c for c in code_obj.co_consts ]):
267 any( [ isinstance(c, basestring) and 'autopx' in c for c in code_obj.co_consts ]):
262 self._disable_autopx()
268 self._disable_autopx()
263 return False
269 return False
264 else:
270 else:
265 try:
271 try:
266 result = self.active_view.execute(code_obj, block=False)
272 result = self.active_view.execute(code_obj, block=False)
267 except:
273 except:
268 ipself.showtraceback()
274 ipself.showtraceback()
269 return True
275 return True
270 else:
276 else:
271 if self.active_view.block:
277 if self.active_view.block:
272 try:
278 try:
273 result.get()
279 result.get()
274 except:
280 except:
275 self.shell.showtraceback()
281 self.shell.showtraceback()
276 return True
282 return True
277 else:
283 else:
278 self._maybe_display_output(result)
284 self._maybe_display_output(result)
279 return False
285 return False
280
286
281
287
282
288
283
289
284 _loaded = False
290 _loaded = False
285
291
286
292
287 def load_ipython_extension(ip):
293 def load_ipython_extension(ip):
288 """Load the extension in IPython."""
294 """Load the extension in IPython."""
289 global _loaded
295 global _loaded
290 if not _loaded:
296 if not _loaded:
291 plugin = ParalleMagic(shell=ip, config=ip.config)
297 plugin = ParalleMagic(shell=ip, config=ip.config)
292 ip.plugin_manager.register_plugin('parallelmagic', plugin)
298 ip.plugin_manager.register_plugin('parallelmagic', plugin)
293 _loaded = True
299 _loaded = True
294
300
@@ -1,440 +1,441 b''
1 """test View objects"""
1 """test View objects"""
2 # -*- coding: utf-8 -*-
2 # -*- coding: utf-8 -*-
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
4 # Copyright (C) 2011 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import sys
14 import sys
15 import time
15 import time
16 from tempfile import mktemp
16 from tempfile import mktemp
17 from StringIO import StringIO
17 from StringIO import StringIO
18
18
19 import zmq
19 import zmq
20
20
21 from IPython import parallel as pmod
21 from IPython import parallel as pmod
22 from IPython.parallel import error
22 from IPython.parallel import error
23 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
23 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
24 from IPython.parallel import DirectView
24 from IPython.parallel import DirectView
25 from IPython.parallel.util import interactive
25 from IPython.parallel.util import interactive
26
26
27 from IPython.parallel.tests import add_engines
27 from IPython.parallel.tests import add_engines
28
28
29 from .clienttest import ClusterTestCase, crash, wait, skip_without
29 from .clienttest import ClusterTestCase, crash, wait, skip_without
30
30
31 def setup():
31 def setup():
32 add_engines(3)
32 add_engines(3)
33
33
34 class TestView(ClusterTestCase):
34 class TestView(ClusterTestCase):
35
35
36 def test_z_crash_mux(self):
36 def test_z_crash_mux(self):
37 """test graceful handling of engine death (direct)"""
37 """test graceful handling of engine death (direct)"""
38 # self.add_engines(1)
38 # self.add_engines(1)
39 eid = self.client.ids[-1]
39 eid = self.client.ids[-1]
40 ar = self.client[eid].apply_async(crash)
40 ar = self.client[eid].apply_async(crash)
41 self.assertRaisesRemote(error.EngineError, ar.get)
41 self.assertRaisesRemote(error.EngineError, ar.get)
42 eid = ar.engine_id
42 eid = ar.engine_id
43 tic = time.time()
43 tic = time.time()
44 while eid in self.client.ids and time.time()-tic < 5:
44 while eid in self.client.ids and time.time()-tic < 5:
45 time.sleep(.01)
45 time.sleep(.01)
46 self.client.spin()
46 self.client.spin()
47 self.assertFalse(eid in self.client.ids, "Engine should have died")
47 self.assertFalse(eid in self.client.ids, "Engine should have died")
48
48
49 def test_push_pull(self):
49 def test_push_pull(self):
50 """test pushing and pulling"""
50 """test pushing and pulling"""
51 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
51 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
52 t = self.client.ids[-1]
52 t = self.client.ids[-1]
53 v = self.client[t]
53 v = self.client[t]
54 push = v.push
54 push = v.push
55 pull = v.pull
55 pull = v.pull
56 v.block=True
56 v.block=True
57 nengines = len(self.client)
57 nengines = len(self.client)
58 push({'data':data})
58 push({'data':data})
59 d = pull('data')
59 d = pull('data')
60 self.assertEquals(d, data)
60 self.assertEquals(d, data)
61 self.client[:].push({'data':data})
61 self.client[:].push({'data':data})
62 d = self.client[:].pull('data', block=True)
62 d = self.client[:].pull('data', block=True)
63 self.assertEquals(d, nengines*[data])
63 self.assertEquals(d, nengines*[data])
64 ar = push({'data':data}, block=False)
64 ar = push({'data':data}, block=False)
65 self.assertTrue(isinstance(ar, AsyncResult))
65 self.assertTrue(isinstance(ar, AsyncResult))
66 r = ar.get()
66 r = ar.get()
67 ar = self.client[:].pull('data', block=False)
67 ar = self.client[:].pull('data', block=False)
68 self.assertTrue(isinstance(ar, AsyncResult))
68 self.assertTrue(isinstance(ar, AsyncResult))
69 r = ar.get()
69 r = ar.get()
70 self.assertEquals(r, nengines*[data])
70 self.assertEquals(r, nengines*[data])
71 self.client[:].push(dict(a=10,b=20))
71 self.client[:].push(dict(a=10,b=20))
72 r = self.client[:].pull(('a','b'), block=True)
72 r = self.client[:].pull(('a','b'), block=True)
73 self.assertEquals(r, nengines*[[10,20]])
73 self.assertEquals(r, nengines*[[10,20]])
74
74
75 def test_push_pull_function(self):
75 def test_push_pull_function(self):
76 "test pushing and pulling functions"
76 "test pushing and pulling functions"
77 def testf(x):
77 def testf(x):
78 return 2.0*x
78 return 2.0*x
79
79
80 t = self.client.ids[-1]
80 t = self.client.ids[-1]
81 v = self.client[t]
81 v = self.client[t]
82 v.block=True
82 v.block=True
83 push = v.push
83 push = v.push
84 pull = v.pull
84 pull = v.pull
85 execute = v.execute
85 execute = v.execute
86 push({'testf':testf})
86 push({'testf':testf})
87 r = pull('testf')
87 r = pull('testf')
88 self.assertEqual(r(1.0), testf(1.0))
88 self.assertEqual(r(1.0), testf(1.0))
89 execute('r = testf(10)')
89 execute('r = testf(10)')
90 r = pull('r')
90 r = pull('r')
91 self.assertEquals(r, testf(10))
91 self.assertEquals(r, testf(10))
92 ar = self.client[:].push({'testf':testf}, block=False)
92 ar = self.client[:].push({'testf':testf}, block=False)
93 ar.get()
93 ar.get()
94 ar = self.client[:].pull('testf', block=False)
94 ar = self.client[:].pull('testf', block=False)
95 rlist = ar.get()
95 rlist = ar.get()
96 for r in rlist:
96 for r in rlist:
97 self.assertEqual(r(1.0), testf(1.0))
97 self.assertEqual(r(1.0), testf(1.0))
98 execute("def g(x): return x*x")
98 execute("def g(x): return x*x")
99 r = pull(('testf','g'))
99 r = pull(('testf','g'))
100 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
100 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
101
101
102 def test_push_function_globals(self):
102 def test_push_function_globals(self):
103 """test that pushed functions have access to globals"""
103 """test that pushed functions have access to globals"""
104 @interactive
104 @interactive
105 def geta():
105 def geta():
106 return a
106 return a
107 # self.add_engines(1)
107 # self.add_engines(1)
108 v = self.client[-1]
108 v = self.client[-1]
109 v.block=True
109 v.block=True
110 v['f'] = geta
110 v['f'] = geta
111 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
111 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
112 v.execute('a=5')
112 v.execute('a=5')
113 v.execute('b=f()')
113 v.execute('b=f()')
114 self.assertEquals(v['b'], 5)
114 self.assertEquals(v['b'], 5)
115
115
116 def test_push_function_defaults(self):
116 def test_push_function_defaults(self):
117 """test that pushed functions preserve default args"""
117 """test that pushed functions preserve default args"""
118 def echo(a=10):
118 def echo(a=10):
119 return a
119 return a
120 v = self.client[-1]
120 v = self.client[-1]
121 v.block=True
121 v.block=True
122 v['f'] = echo
122 v['f'] = echo
123 v.execute('b=f()')
123 v.execute('b=f()')
124 self.assertEquals(v['b'], 10)
124 self.assertEquals(v['b'], 10)
125
125
126 def test_get_result(self):
126 def test_get_result(self):
127 """test getting results from the Hub."""
127 """test getting results from the Hub."""
128 c = pmod.Client(profile='iptest')
128 c = pmod.Client(profile='iptest')
129 # self.add_engines(1)
129 # self.add_engines(1)
130 t = c.ids[-1]
130 t = c.ids[-1]
131 v = c[t]
131 v = c[t]
132 v2 = self.client[t]
132 v2 = self.client[t]
133 ar = v.apply_async(wait, 1)
133 ar = v.apply_async(wait, 1)
134 # give the monitor time to notice the message
134 # give the monitor time to notice the message
135 time.sleep(.25)
135 time.sleep(.25)
136 ahr = v2.get_result(ar.msg_ids)
136 ahr = v2.get_result(ar.msg_ids)
137 self.assertTrue(isinstance(ahr, AsyncHubResult))
137 self.assertTrue(isinstance(ahr, AsyncHubResult))
138 self.assertEquals(ahr.get(), ar.get())
138 self.assertEquals(ahr.get(), ar.get())
139 ar2 = v2.get_result(ar.msg_ids)
139 ar2 = v2.get_result(ar.msg_ids)
140 self.assertFalse(isinstance(ar2, AsyncHubResult))
140 self.assertFalse(isinstance(ar2, AsyncHubResult))
141 c.spin()
141 c.spin()
142 c.close()
142 c.close()
143
143
144 def test_run_newline(self):
144 def test_run_newline(self):
145 """test that run appends newline to files"""
145 """test that run appends newline to files"""
146 tmpfile = mktemp()
146 tmpfile = mktemp()
147 with open(tmpfile, 'w') as f:
147 with open(tmpfile, 'w') as f:
148 f.write("""def g():
148 f.write("""def g():
149 return 5
149 return 5
150 """)
150 """)
151 v = self.client[-1]
151 v = self.client[-1]
152 v.run(tmpfile, block=True)
152 v.run(tmpfile, block=True)
153 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
153 self.assertEquals(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
154
154
155 def test_apply_tracked(self):
155 def test_apply_tracked(self):
156 """test tracking for apply"""
156 """test tracking for apply"""
157 # self.add_engines(1)
157 # self.add_engines(1)
158 t = self.client.ids[-1]
158 t = self.client.ids[-1]
159 v = self.client[t]
159 v = self.client[t]
160 v.block=False
160 v.block=False
161 def echo(n=1024*1024, **kwargs):
161 def echo(n=1024*1024, **kwargs):
162 with v.temp_flags(**kwargs):
162 with v.temp_flags(**kwargs):
163 return v.apply(lambda x: x, 'x'*n)
163 return v.apply(lambda x: x, 'x'*n)
164 ar = echo(1, track=False)
164 ar = echo(1, track=False)
165 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
165 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
166 self.assertTrue(ar.sent)
166 self.assertTrue(ar.sent)
167 ar = echo(track=True)
167 ar = echo(track=True)
168 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
168 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
169 self.assertEquals(ar.sent, ar._tracker.done)
169 self.assertEquals(ar.sent, ar._tracker.done)
170 ar._tracker.wait()
170 ar._tracker.wait()
171 self.assertTrue(ar.sent)
171 self.assertTrue(ar.sent)
172
172
173 def test_push_tracked(self):
173 def test_push_tracked(self):
174 t = self.client.ids[-1]
174 t = self.client.ids[-1]
175 ns = dict(x='x'*1024*1024)
175 ns = dict(x='x'*1024*1024)
176 v = self.client[t]
176 v = self.client[t]
177 ar = v.push(ns, block=False, track=False)
177 ar = v.push(ns, block=False, track=False)
178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
178 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
179 self.assertTrue(ar.sent)
179 self.assertTrue(ar.sent)
180
180
181 ar = v.push(ns, block=False, track=True)
181 ar = v.push(ns, block=False, track=True)
182 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
182 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
183 self.assertEquals(ar.sent, ar._tracker.done)
183 self.assertEquals(ar.sent, ar._tracker.done)
184 ar._tracker.wait()
184 ar._tracker.wait()
185 self.assertTrue(ar.sent)
185 self.assertTrue(ar.sent)
186 ar.get()
186 ar.get()
187
187
188 def test_scatter_tracked(self):
188 def test_scatter_tracked(self):
189 t = self.client.ids
189 t = self.client.ids
190 x='x'*1024*1024
190 x='x'*1024*1024
191 ar = self.client[t].scatter('x', x, block=False, track=False)
191 ar = self.client[t].scatter('x', x, block=False, track=False)
192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
193 self.assertTrue(ar.sent)
193 self.assertTrue(ar.sent)
194
194
195 ar = self.client[t].scatter('x', x, block=False, track=True)
195 ar = self.client[t].scatter('x', x, block=False, track=True)
196 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
196 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
197 self.assertEquals(ar.sent, ar._tracker.done)
197 self.assertEquals(ar.sent, ar._tracker.done)
198 ar._tracker.wait()
198 ar._tracker.wait()
199 self.assertTrue(ar.sent)
199 self.assertTrue(ar.sent)
200 ar.get()
200 ar.get()
201
201
202 def test_remote_reference(self):
202 def test_remote_reference(self):
203 v = self.client[-1]
203 v = self.client[-1]
204 v['a'] = 123
204 v['a'] = 123
205 ra = pmod.Reference('a')
205 ra = pmod.Reference('a')
206 b = v.apply_sync(lambda x: x, ra)
206 b = v.apply_sync(lambda x: x, ra)
207 self.assertEquals(b, 123)
207 self.assertEquals(b, 123)
208
208
209
209
210 def test_scatter_gather(self):
210 def test_scatter_gather(self):
211 view = self.client[:]
211 view = self.client[:]
212 seq1 = range(16)
212 seq1 = range(16)
213 view.scatter('a', seq1)
213 view.scatter('a', seq1)
214 seq2 = view.gather('a', block=True)
214 seq2 = view.gather('a', block=True)
215 self.assertEquals(seq2, seq1)
215 self.assertEquals(seq2, seq1)
216 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
216 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
217
217
218 @skip_without('numpy')
218 @skip_without('numpy')
219 def test_scatter_gather_numpy(self):
219 def test_scatter_gather_numpy(self):
220 import numpy
220 import numpy
221 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
221 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
222 view = self.client[:]
222 view = self.client[:]
223 a = numpy.arange(64)
223 a = numpy.arange(64)
224 view.scatter('a', a)
224 view.scatter('a', a)
225 b = view.gather('a', block=True)
225 b = view.gather('a', block=True)
226 assert_array_equal(b, a)
226 assert_array_equal(b, a)
227
227
228 def test_map(self):
228 def test_map(self):
229 view = self.client[:]
229 view = self.client[:]
230 def f(x):
230 def f(x):
231 return x**2
231 return x**2
232 data = range(16)
232 data = range(16)
233 r = view.map_sync(f, data)
233 r = view.map_sync(f, data)
234 self.assertEquals(r, map(f, data))
234 self.assertEquals(r, map(f, data))
235
235
236 def test_scatterGatherNonblocking(self):
236 def test_scatterGatherNonblocking(self):
237 data = range(16)
237 data = range(16)
238 view = self.client[:]
238 view = self.client[:]
239 view.scatter('a', data, block=False)
239 view.scatter('a', data, block=False)
240 ar = view.gather('a', block=False)
240 ar = view.gather('a', block=False)
241 self.assertEquals(ar.get(), data)
241 self.assertEquals(ar.get(), data)
242
242
243 @skip_without('numpy')
243 @skip_without('numpy')
244 def test_scatter_gather_numpy_nonblocking(self):
244 def test_scatter_gather_numpy_nonblocking(self):
245 import numpy
245 import numpy
246 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
246 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
247 a = numpy.arange(64)
247 a = numpy.arange(64)
248 view = self.client[:]
248 view = self.client[:]
249 ar = view.scatter('a', a, block=False)
249 ar = view.scatter('a', a, block=False)
250 self.assertTrue(isinstance(ar, AsyncResult))
250 self.assertTrue(isinstance(ar, AsyncResult))
251 amr = view.gather('a', block=False)
251 amr = view.gather('a', block=False)
252 self.assertTrue(isinstance(amr, AsyncMapResult))
252 self.assertTrue(isinstance(amr, AsyncMapResult))
253 assert_array_equal(amr.get(), a)
253 assert_array_equal(amr.get(), a)
254
254
255 def test_execute(self):
255 def test_execute(self):
256 view = self.client[:]
256 view = self.client[:]
257 # self.client.debug=True
257 # self.client.debug=True
258 execute = view.execute
258 execute = view.execute
259 ar = execute('c=30', block=False)
259 ar = execute('c=30', block=False)
260 self.assertTrue(isinstance(ar, AsyncResult))
260 self.assertTrue(isinstance(ar, AsyncResult))
261 ar = execute('d=[0,1,2]', block=False)
261 ar = execute('d=[0,1,2]', block=False)
262 self.client.wait(ar, 1)
262 self.client.wait(ar, 1)
263 self.assertEquals(len(ar.get()), len(self.client))
263 self.assertEquals(len(ar.get()), len(self.client))
264 for c in view['c']:
264 for c in view['c']:
265 self.assertEquals(c, 30)
265 self.assertEquals(c, 30)
266
266
267 def test_abort(self):
267 def test_abort(self):
268 view = self.client[-1]
268 view = self.client[-1]
269 ar = view.execute('import time; time.sleep(0.25)', block=False)
269 ar = view.execute('import time; time.sleep(0.25)', block=False)
270 ar2 = view.apply_async(lambda : 2)
270 ar2 = view.apply_async(lambda : 2)
271 ar3 = view.apply_async(lambda : 3)
271 ar3 = view.apply_async(lambda : 3)
272 view.abort(ar2)
272 view.abort(ar2)
273 view.abort(ar3.msg_ids)
273 view.abort(ar3.msg_ids)
274 self.assertRaises(error.TaskAborted, ar2.get)
274 self.assertRaises(error.TaskAborted, ar2.get)
275 self.assertRaises(error.TaskAborted, ar3.get)
275 self.assertRaises(error.TaskAborted, ar3.get)
276
276
277 def test_temp_flags(self):
277 def test_temp_flags(self):
278 view = self.client[-1]
278 view = self.client[-1]
279 view.block=True
279 view.block=True
280 with view.temp_flags(block=False):
280 with view.temp_flags(block=False):
281 self.assertFalse(view.block)
281 self.assertFalse(view.block)
282 self.assertTrue(view.block)
282 self.assertTrue(view.block)
283
283
284 def test_importer(self):
284 def test_importer(self):
285 view = self.client[-1]
285 view = self.client[-1]
286 view.clear(block=True)
286 view.clear(block=True)
287 with view.importer:
287 with view.importer:
288 import re
288 import re
289
289
290 @interactive
290 @interactive
291 def findall(pat, s):
291 def findall(pat, s):
292 # this globals() step isn't necessary in real code
292 # this globals() step isn't necessary in real code
293 # only to prevent a closure in the test
293 # only to prevent a closure in the test
294 re = globals()['re']
294 re = globals()['re']
295 return re.findall(pat, s)
295 return re.findall(pat, s)
296
296
297 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
297 self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
298
298
299 # parallel magic tests
299 # parallel magic tests
300
300
301 def test_magic_px_blocking(self):
301 def test_magic_px_blocking(self):
302 ip = get_ipython()
302 ip = get_ipython()
303 v = self.client[-1]
303 v = self.client[-1]
304 v.activate()
304 v.activate()
305 v.block=True
305 v.block=True
306
306
307 ip.magic_px('a=5')
307 ip.magic_px('a=5')
308 self.assertEquals(v['a'], 5)
308 self.assertEquals(v['a'], 5)
309 ip.magic_px('a=10')
309 ip.magic_px('a=10')
310 self.assertEquals(v['a'], 10)
310 self.assertEquals(v['a'], 10)
311 sio = StringIO()
311 sio = StringIO()
312 savestdout = sys.stdout
312 savestdout = sys.stdout
313 sys.stdout = sio
313 sys.stdout = sio
314 ip.magic_px('print a')
314 ip.magic_px('print a')
315 sys.stdout = savestdout
315 sys.stdout = savestdout
316 sio.read()
316 sio.read()
317 self.assertTrue('[stdout:%i]'%v.targets in sio.buf)
317 self.assertTrue('[stdout:%i]'%v.targets in sio.buf)
318 self.assertTrue(sio.buf.rstrip().endswith('10'))
318 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
319 self.assertRaisesRemote(ZeroDivisionError, ip.magic_px, '1/0')
319
320
320 def test_magic_px_nonblocking(self):
321 def test_magic_px_nonblocking(self):
321 ip = get_ipython()
322 ip = get_ipython()
322 v = self.client[-1]
323 v = self.client[-1]
323 v.activate()
324 v.activate()
324 v.block=False
325 v.block=False
325
326
326 ip.magic_px('a=5')
327 ip.magic_px('a=5')
327 self.assertEquals(v['a'], 5)
328 self.assertEquals(v['a'], 5)
328 ip.magic_px('a=10')
329 ip.magic_px('a=10')
329 self.assertEquals(v['a'], 10)
330 self.assertEquals(v['a'], 10)
330 sio = StringIO()
331 sio = StringIO()
331 savestdout = sys.stdout
332 savestdout = sys.stdout
332 sys.stdout = sio
333 sys.stdout = sio
333 ip.magic_px('print a')
334 ip.magic_px('print a')
334 sys.stdout = savestdout
335 sys.stdout = savestdout
335 sio.read()
336 sio.read()
336 self.assertFalse('[stdout:%i]'%v.targets in sio.buf)
337 self.assertFalse('[stdout:%i]'%v.targets in sio.buf)
337 ip.magic_px('1/0')
338 ip.magic_px('1/0')
338 ar = v.get_result(-1)
339 ar = v.get_result(-1)
339 self.assertRaisesRemote(ZeroDivisionError, ar.get)
340 self.assertRaisesRemote(ZeroDivisionError, ar.get)
340
341
341 def test_magic_autopx_blocking(self):
342 def test_magic_autopx_blocking(self):
342 ip = get_ipython()
343 ip = get_ipython()
343 v = self.client[-1]
344 v = self.client[-1]
344 v.activate()
345 v.activate()
345 v.block=True
346 v.block=True
346
347
347 sio = StringIO()
348 sio = StringIO()
348 savestdout = sys.stdout
349 savestdout = sys.stdout
349 sys.stdout = sio
350 sys.stdout = sio
350 ip.magic_autopx()
351 ip.magic_autopx()
351 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
352 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
352 ip.run_cell('print b')
353 ip.run_cell('print b')
353 ip.run_cell("b/c")
354 ip.run_cell("b/c")
354 ip.run_code(compile('b*=2', '', 'single'))
355 ip.run_code(compile('b*=2', '', 'single'))
355 ip.magic_autopx()
356 ip.magic_autopx()
356 sys.stdout = savestdout
357 sys.stdout = savestdout
357 sio.read()
358 sio.read()
358 output = sio.buf.strip()
359 output = sio.buf.strip()
359 self.assertTrue(output.startswith('%autopx enabled'))
360 self.assertTrue(output.startswith('%autopx enabled'))
360 self.assertTrue(output.endswith('%autopx disabled'))
361 self.assertTrue(output.endswith('%autopx disabled'))
361 self.assertTrue('RemoteError: ZeroDivisionError' in output)
362 self.assertTrue('RemoteError: ZeroDivisionError' in output)
362 ar = v.get_result(-2)
363 ar = v.get_result(-2)
363 self.assertEquals(v['a'], 5)
364 self.assertEquals(v['a'], 5)
364 self.assertEquals(v['b'], 20)
365 self.assertEquals(v['b'], 20)
365 self.assertRaisesRemote(ZeroDivisionError, ar.get)
366 self.assertRaisesRemote(ZeroDivisionError, ar.get)
366
367
367 def test_magic_autopx_nonblocking(self):
368 def test_magic_autopx_nonblocking(self):
368 ip = get_ipython()
369 ip = get_ipython()
369 v = self.client[-1]
370 v = self.client[-1]
370 v.activate()
371 v.activate()
371 v.block=False
372 v.block=False
372
373
373 sio = StringIO()
374 sio = StringIO()
374 savestdout = sys.stdout
375 savestdout = sys.stdout
375 sys.stdout = sio
376 sys.stdout = sio
376 ip.magic_autopx()
377 ip.magic_autopx()
377 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
378 ip.run_cell('\n'.join(('a=5','b=10','c=0')))
378 ip.run_cell('print b')
379 ip.run_cell('print b')
379 ip.run_cell("b/c")
380 ip.run_cell("b/c")
380 ip.run_code(compile('b*=2', '', 'single'))
381 ip.run_code(compile('b*=2', '', 'single'))
381 ip.magic_autopx()
382 ip.magic_autopx()
382 sys.stdout = savestdout
383 sys.stdout = savestdout
383 sio.read()
384 sio.read()
384 output = sio.buf.strip()
385 output = sio.buf.strip()
385 self.assertTrue(output.startswith('%autopx enabled'))
386 self.assertTrue(output.startswith('%autopx enabled'))
386 self.assertTrue(output.endswith('%autopx disabled'))
387 self.assertTrue(output.endswith('%autopx disabled'))
387 self.assertFalse('ZeroDivisionError' in output)
388 self.assertFalse('ZeroDivisionError' in output)
388 ar = v.get_result(-2)
389 ar = v.get_result(-2)
389 self.assertEquals(v['a'], 5)
390 self.assertEquals(v['a'], 5)
390 self.assertEquals(v['b'], 20)
391 self.assertEquals(v['b'], 20)
391 self.assertRaisesRemote(ZeroDivisionError, ar.get)
392 self.assertRaisesRemote(ZeroDivisionError, ar.get)
392
393
393 def test_magic_result(self):
394 def test_magic_result(self):
394 ip = get_ipython()
395 ip = get_ipython()
395 v = self.client[-1]
396 v = self.client[-1]
396 v.activate()
397 v.activate()
397 v['a'] = 111
398 v['a'] = 111
398 ra = v['a']
399 ra = v['a']
399
400
400 ar = ip.magic_result()
401 ar = ip.magic_result()
401 self.assertEquals(ar.msg_ids, [v.history[-1]])
402 self.assertEquals(ar.msg_ids, [v.history[-1]])
402 self.assertEquals(ar.get(), 111)
403 self.assertEquals(ar.get(), 111)
403 ar = ip.magic_result('-2')
404 ar = ip.magic_result('-2')
404 self.assertEquals(ar.msg_ids, [v.history[-2]])
405 self.assertEquals(ar.msg_ids, [v.history[-2]])
405
406
406 def test_unicode_execute(self):
407 def test_unicode_execute(self):
407 """test executing unicode strings"""
408 """test executing unicode strings"""
408 v = self.client[-1]
409 v = self.client[-1]
409 v.block=True
410 v.block=True
410 code=u"a=u'é'"
411 code=u"a=u'é'"
411 v.execute(code)
412 v.execute(code)
412 self.assertEquals(v['a'], u'é')
413 self.assertEquals(v['a'], u'é')
413
414
414 def test_unicode_apply_result(self):
415 def test_unicode_apply_result(self):
415 """test unicode apply results"""
416 """test unicode apply results"""
416 v = self.client[-1]
417 v = self.client[-1]
417 r = v.apply_sync(lambda : u'é')
418 r = v.apply_sync(lambda : u'é')
418 self.assertEquals(r, u'é')
419 self.assertEquals(r, u'é')
419
420
420 def test_unicode_apply_arg(self):
421 def test_unicode_apply_arg(self):
421 """test passing unicode arguments to apply"""
422 """test passing unicode arguments to apply"""
422 v = self.client[-1]
423 v = self.client[-1]
423
424
424 @interactive
425 @interactive
425 def check_unicode(a, check):
426 def check_unicode(a, check):
426 assert isinstance(a, unicode), "%r is not unicode"%a
427 assert isinstance(a, unicode), "%r is not unicode"%a
427 assert isinstance(check, bytes), "%r is not bytes"%check
428 assert isinstance(check, bytes), "%r is not bytes"%check
428 assert a.encode('utf8') == check, "%s != %s"%(a,check)
429 assert a.encode('utf8') == check, "%s != %s"%(a,check)
429
430
430 for s in [ u'é', u'ßø®∫','asdf'.decode() ]:
431 for s in [ u'é', u'ßø®∫','asdf'.decode() ]:
431 try:
432 try:
432 v.apply_sync(check_unicode, s, s.encode('utf8'))
433 v.apply_sync(check_unicode, s, s.encode('utf8'))
433 except error.RemoteError as e:
434 except error.RemoteError as e:
434 if e.ename == 'AssertionError':
435 if e.ename == 'AssertionError':
435 self.fail(e.evalue)
436 self.fail(e.evalue)
436 else:
437 else:
437 raise e
438 raise e
438
439
439
440
440
441
General Comments 0
You need to be logged in to leave comments. Login now