##// END OF EJS Templates
Merge pull request #2871 from minrk/shortexc...
Brian E. Granger -
r9725:c2bc10dd merge
parent child Browse files
Show More
@@ -1,346 +1,354 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """Classes and functions for kernel related errors and exceptions.
3 """Classes and functions for kernel related errors and exceptions.
4
4
5 Inheritance diagram:
5 Inheritance diagram:
6
6
7 .. inheritance-diagram:: IPython.parallel.error
7 .. inheritance-diagram:: IPython.parallel.error
8 :parts: 3
8 :parts: 3
9
9
10 Authors:
10 Authors:
11
11
12 * Brian Granger
12 * Brian Granger
13 * Min RK
13 * Min RK
14 """
14 """
15 from __future__ import print_function
15 from __future__ import print_function
16
16
17 import sys
17 import sys
18 import traceback
18 import traceback
19
19
20 __docformat__ = "restructuredtext en"
20 __docformat__ = "restructuredtext en"
21
21
22 # Tell nose to skip this module
22 # Tell nose to skip this module
23 __test__ = {}
23 __test__ = {}
24
24
25 #-------------------------------------------------------------------------------
25 #-------------------------------------------------------------------------------
26 # Copyright (C) 2008-2011 The IPython Development Team
26 # Copyright (C) 2008-2011 The IPython Development Team
27 #
27 #
28 # Distributed under the terms of the BSD License. The full license is in
28 # Distributed under the terms of the BSD License. The full license is in
29 # the file COPYING, distributed as part of this software.
29 # the file COPYING, distributed as part of this software.
30 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
31
31
32 #-------------------------------------------------------------------------------
32 #-------------------------------------------------------------------------------
33 # Error classes
33 # Error classes
34 #-------------------------------------------------------------------------------
34 #-------------------------------------------------------------------------------
35 class IPythonError(Exception):
35 class IPythonError(Exception):
36 """Base exception that all of our exceptions inherit from.
36 """Base exception that all of our exceptions inherit from.
37
37
38 This can be raised by code that doesn't have any more specific
38 This can be raised by code that doesn't have any more specific
39 information."""
39 information."""
40
40
41 pass
41 pass
42
42
43 # Exceptions associated with the controller objects
43 # Exceptions associated with the controller objects
44 class ControllerError(IPythonError): pass
44 class ControllerError(IPythonError): pass
45
45
46 class ControllerCreationError(ControllerError): pass
46 class ControllerCreationError(ControllerError): pass
47
47
48
48
49 # Exceptions associated with the Engines
49 # Exceptions associated with the Engines
50 class EngineError(IPythonError): pass
50 class EngineError(IPythonError): pass
51
51
52 class EngineCreationError(EngineError): pass
52 class EngineCreationError(EngineError): pass
53
53
54 class KernelError(IPythonError):
54 class KernelError(IPythonError):
55 pass
55 pass
56
56
57 class NotDefined(KernelError):
57 class NotDefined(KernelError):
58 def __init__(self, name):
58 def __init__(self, name):
59 self.name = name
59 self.name = name
60 self.args = (name,)
60 self.args = (name,)
61
61
62 def __repr__(self):
62 def __repr__(self):
63 return '<NotDefined: %s>' % self.name
63 return '<NotDefined: %s>' % self.name
64
64
65 __str__ = __repr__
65 __str__ = __repr__
66
66
67
67
68 class QueueCleared(KernelError):
68 class QueueCleared(KernelError):
69 pass
69 pass
70
70
71
71
72 class IdInUse(KernelError):
72 class IdInUse(KernelError):
73 pass
73 pass
74
74
75
75
76 class ProtocolError(KernelError):
76 class ProtocolError(KernelError):
77 pass
77 pass
78
78
79
79
80 class ConnectionError(KernelError):
80 class ConnectionError(KernelError):
81 pass
81 pass
82
82
83
83
84 class InvalidEngineID(KernelError):
84 class InvalidEngineID(KernelError):
85 pass
85 pass
86
86
87
87
88 class NoEnginesRegistered(KernelError):
88 class NoEnginesRegistered(KernelError):
89 pass
89 pass
90
90
91
91
92 class InvalidClientID(KernelError):
92 class InvalidClientID(KernelError):
93 pass
93 pass
94
94
95
95
96 class InvalidDeferredID(KernelError):
96 class InvalidDeferredID(KernelError):
97 pass
97 pass
98
98
99
99
100 class SerializationError(KernelError):
100 class SerializationError(KernelError):
101 pass
101 pass
102
102
103
103
104 class MessageSizeError(KernelError):
104 class MessageSizeError(KernelError):
105 pass
105 pass
106
106
107
107
108 class PBMessageSizeError(MessageSizeError):
108 class PBMessageSizeError(MessageSizeError):
109 pass
109 pass
110
110
111
111
112 class ResultNotCompleted(KernelError):
112 class ResultNotCompleted(KernelError):
113 pass
113 pass
114
114
115
115
116 class ResultAlreadyRetrieved(KernelError):
116 class ResultAlreadyRetrieved(KernelError):
117 pass
117 pass
118
118
119 class ClientError(KernelError):
119 class ClientError(KernelError):
120 pass
120 pass
121
121
122
122
123 class TaskAborted(KernelError):
123 class TaskAborted(KernelError):
124 pass
124 pass
125
125
126
126
127 class TaskTimeout(KernelError):
127 class TaskTimeout(KernelError):
128 pass
128 pass
129
129
130
130
131 class NotAPendingResult(KernelError):
131 class NotAPendingResult(KernelError):
132 pass
132 pass
133
133
134
134
135 class UnpickleableException(KernelError):
135 class UnpickleableException(KernelError):
136 pass
136 pass
137
137
138
138
139 class AbortedPendingDeferredError(KernelError):
139 class AbortedPendingDeferredError(KernelError):
140 pass
140 pass
141
141
142
142
143 class InvalidProperty(KernelError):
143 class InvalidProperty(KernelError):
144 pass
144 pass
145
145
146
146
147 class MissingBlockArgument(KernelError):
147 class MissingBlockArgument(KernelError):
148 pass
148 pass
149
149
150
150
151 class StopLocalExecution(KernelError):
151 class StopLocalExecution(KernelError):
152 pass
152 pass
153
153
154
154
155 class SecurityError(KernelError):
155 class SecurityError(KernelError):
156 pass
156 pass
157
157
158
158
159 class FileTimeoutError(KernelError):
159 class FileTimeoutError(KernelError):
160 pass
160 pass
161
161
162 class TimeoutError(KernelError):
162 class TimeoutError(KernelError):
163 pass
163 pass
164
164
165 class UnmetDependency(KernelError):
165 class UnmetDependency(KernelError):
166 pass
166 pass
167
167
168 class ImpossibleDependency(UnmetDependency):
168 class ImpossibleDependency(UnmetDependency):
169 pass
169 pass
170
170
171 class DependencyTimeout(ImpossibleDependency):
171 class DependencyTimeout(ImpossibleDependency):
172 pass
172 pass
173
173
174 class InvalidDependency(ImpossibleDependency):
174 class InvalidDependency(ImpossibleDependency):
175 pass
175 pass
176
176
177 class RemoteError(KernelError):
177 class RemoteError(KernelError):
178 """Error raised elsewhere"""
178 """Error raised elsewhere"""
179 ename=None
179 ename=None
180 evalue=None
180 evalue=None
181 traceback=None
181 traceback=None
182 engine_info=None
182 engine_info=None
183
183
184 def __init__(self, ename, evalue, traceback, engine_info=None):
184 def __init__(self, ename, evalue, traceback, engine_info=None):
185 self.ename=ename
185 self.ename=ename
186 self.evalue=evalue
186 self.evalue=evalue
187 self.traceback=traceback
187 self.traceback=traceback
188 self.engine_info=engine_info or {}
188 self.engine_info=engine_info or {}
189 self.args=(ename, evalue)
189 self.args=(ename, evalue)
190
190
191 def __repr__(self):
191 def __repr__(self):
192 engineid = self.engine_info.get('engine_id', ' ')
192 engineid = self.engine_info.get('engine_id', ' ')
193 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
193 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
194
194
195 def __str__(self):
195 def __str__(self):
196 return "%s(%s)" % (self.ename, self.evalue)
196 return "%s(%s)" % (self.ename, self.evalue)
197
197
198 def render_traceback(self):
198 def render_traceback(self):
199 """render traceback to a list of lines"""
199 """render traceback to a list of lines"""
200 return (self.traceback or "No traceback available").splitlines()
200 return (self.traceback or "No traceback available").splitlines()
201
201
202 def _render_traceback_(self):
202 def _render_traceback_(self):
203 """Special method for custom tracebacks within IPython.
203 """Special method for custom tracebacks within IPython.
204
204
205 This will be called by IPython instead of displaying the local traceback.
205 This will be called by IPython instead of displaying the local traceback.
206
206
207 It should return a traceback rendered as a list of lines.
207 It should return a traceback rendered as a list of lines.
208 """
208 """
209 return self.render_traceback()
209 return self.render_traceback()
210
210
211 def print_traceback(self, excid=None):
211 def print_traceback(self, excid=None):
212 """print my traceback"""
212 """print my traceback"""
213 print('\n'.join(self.render_traceback()))
213 print('\n'.join(self.render_traceback()))
214
214
215
215
216
216
217
217
218 class TaskRejectError(KernelError):
218 class TaskRejectError(KernelError):
219 """Exception to raise when a task should be rejected by an engine.
219 """Exception to raise when a task should be rejected by an engine.
220
220
221 This exception can be used to allow a task running on an engine to test
221 This exception can be used to allow a task running on an engine to test
222 if the engine (or the user's namespace on the engine) has the needed
222 if the engine (or the user's namespace on the engine) has the needed
223 task dependencies. If not, the task should raise this exception. For
223 task dependencies. If not, the task should raise this exception. For
224 the task to be retried on another engine, the task should be created
224 the task to be retried on another engine, the task should be created
225 with the `retries` argument > 1.
225 with the `retries` argument > 1.
226
226
227 The advantage of this approach over our older properties system is that
227 The advantage of this approach over our older properties system is that
228 tasks have full access to the user's namespace on the engines and the
228 tasks have full access to the user's namespace on the engines and the
229 properties don't have to be managed or tested by the controller.
229 properties don't have to be managed or tested by the controller.
230 """
230 """
231
231
232
232
233 class CompositeError(RemoteError):
233 class CompositeError(RemoteError):
234 """Error for representing possibly multiple errors on engines"""
234 """Error for representing possibly multiple errors on engines"""
235 tb_limit = 4 # limit on how many tracebacks to draw
236
235 def __init__(self, message, elist):
237 def __init__(self, message, elist):
236 Exception.__init__(self, *(message, elist))
238 Exception.__init__(self, *(message, elist))
237 # Don't use pack_exception because it will conflict with the .message
239 # Don't use pack_exception because it will conflict with the .message
238 # attribute that is being deprecated in 2.6 and beyond.
240 # attribute that is being deprecated in 2.6 and beyond.
239 self.msg = message
241 self.msg = message
240 self.elist = elist
242 self.elist = elist
241 self.args = [ e[0] for e in elist ]
243 self.args = [ e[0] for e in elist ]
242
244
243 def _get_engine_str(self, ei):
245 def _get_engine_str(self, ei):
244 if not ei:
246 if not ei:
245 return '[Engine Exception]'
247 return '[Engine Exception]'
246 else:
248 else:
247 return '[%s:%s]: ' % (ei['engine_id'], ei['method'])
249 return '[%s:%s]: ' % (ei['engine_id'], ei['method'])
248
250
249 def _get_traceback(self, ev):
251 def _get_traceback(self, ev):
250 try:
252 try:
251 tb = ev._ipython_traceback_text
253 tb = ev._ipython_traceback_text
252 except AttributeError:
254 except AttributeError:
253 return 'No traceback available'
255 return 'No traceback available'
254 else:
256 else:
255 return tb
257 return tb
256
258
257 def __str__(self):
259 def __str__(self):
258 s = str(self.msg)
260 s = str(self.msg)
259 for en, ev, etb, ei in self.elist:
261 for en, ev, etb, ei in self.elist[:self.tb_limit]:
260 engine_str = self._get_engine_str(ei)
262 engine_str = self._get_engine_str(ei)
261 s = s + '\n' + engine_str + en + ': ' + str(ev)
263 s = s + '\n' + engine_str + en + ': ' + str(ev)
264 if len(self.elist) > self.tb_limit:
265 s = s + '\n.... %i more exceptions ...' % (len(self.elist) - self.tb_limit)
262 return s
266 return s
263
267
264 def __repr__(self):
268 def __repr__(self):
265 return "CompositeError(%i)"%len(self.elist)
269 return "CompositeError(%i)" % len(self.elist)
266
270
267 def render_traceback(self, excid=None):
271 def render_traceback(self, excid=None):
268 """render one or all of my tracebacks to a list of lines"""
272 """render one or all of my tracebacks to a list of lines"""
269 lines = []
273 lines = []
270 if excid is None:
274 if excid is None:
271 for (en,ev,etb,ei) in self.elist:
275 for (en,ev,etb,ei) in self.elist[:self.tb_limit]:
272 lines.append(self._get_engine_str(ei))
276 lines.append(self._get_engine_str(ei))
273 lines.extend((etb or 'No traceback available').splitlines())
277 lines.extend((etb or 'No traceback available').splitlines())
274 lines.append('')
278 lines.append('')
279 if len(self.elist) > self.tb_limit:
280 lines.append(
281 '... %i more exceptions ...' % (len(self.elist) - self.tb_limit)
282 )
275 else:
283 else:
276 try:
284 try:
277 en,ev,etb,ei = self.elist[excid]
285 en,ev,etb,ei = self.elist[excid]
278 except:
286 except:
279 raise IndexError("an exception with index %i does not exist"%excid)
287 raise IndexError("an exception with index %i does not exist"%excid)
280 else:
288 else:
281 lines.append(self._get_engine_str(ei))
289 lines.append(self._get_engine_str(ei))
282 lines.extend((etb or 'No traceback available').splitlines())
290 lines.extend((etb or 'No traceback available').splitlines())
283
291
284 return lines
292 return lines
285
293
286 def print_traceback(self, excid=None):
294 def print_traceback(self, excid=None):
287 print('\n'.join(self.render_traceback(excid)))
295 print('\n'.join(self.render_traceback(excid)))
288
296
289 def raise_exception(self, excid=0):
297 def raise_exception(self, excid=0):
290 try:
298 try:
291 en,ev,etb,ei = self.elist[excid]
299 en,ev,etb,ei = self.elist[excid]
292 except:
300 except:
293 raise IndexError("an exception with index %i does not exist"%excid)
301 raise IndexError("an exception with index %i does not exist"%excid)
294 else:
302 else:
295 raise RemoteError(en, ev, etb, ei)
303 raise RemoteError(en, ev, etb, ei)
296
304
297
305
298 def collect_exceptions(rdict_or_list, method='unspecified'):
306 def collect_exceptions(rdict_or_list, method='unspecified'):
299 """check a result dict for errors, and raise CompositeError if any exist.
307 """check a result dict for errors, and raise CompositeError if any exist.
300 Passthrough otherwise."""
308 Passthrough otherwise."""
301 elist = []
309 elist = []
302 if isinstance(rdict_or_list, dict):
310 if isinstance(rdict_or_list, dict):
303 rlist = rdict_or_list.values()
311 rlist = rdict_or_list.values()
304 else:
312 else:
305 rlist = rdict_or_list
313 rlist = rdict_or_list
306 for r in rlist:
314 for r in rlist:
307 if isinstance(r, RemoteError):
315 if isinstance(r, RemoteError):
308 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
316 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
309 # Sometimes we could have CompositeError in our list. Just take
317 # Sometimes we could have CompositeError in our list. Just take
310 # the errors out of them and put them in our new list. This
318 # the errors out of them and put them in our new list. This
311 # has the effect of flattening lists of CompositeErrors into one
319 # has the effect of flattening lists of CompositeErrors into one
312 # CompositeError
320 # CompositeError
313 if en=='CompositeError':
321 if en=='CompositeError':
314 for e in ev.elist:
322 for e in ev.elist:
315 elist.append(e)
323 elist.append(e)
316 else:
324 else:
317 elist.append((en, ev, etb, ei))
325 elist.append((en, ev, etb, ei))
318 if len(elist)==0:
326 if len(elist)==0:
319 return rdict_or_list
327 return rdict_or_list
320 else:
328 else:
321 msg = "one or more exceptions from call to method: %s" % (method)
329 msg = "one or more exceptions from call to method: %s" % (method)
322 # This silliness is needed so the debugger has access to the exception
330 # This silliness is needed so the debugger has access to the exception
323 # instance (e in this case)
331 # instance (e in this case)
324 try:
332 try:
325 raise CompositeError(msg, elist)
333 raise CompositeError(msg, elist)
326 except CompositeError as e:
334 except CompositeError as e:
327 raise e
335 raise e
328
336
329 def wrap_exception(engine_info={}):
337 def wrap_exception(engine_info={}):
330 etype, evalue, tb = sys.exc_info()
338 etype, evalue, tb = sys.exc_info()
331 stb = traceback.format_exception(etype, evalue, tb)
339 stb = traceback.format_exception(etype, evalue, tb)
332 exc_content = {
340 exc_content = {
333 'status' : 'error',
341 'status' : 'error',
334 'traceback' : stb,
342 'traceback' : stb,
335 'ename' : unicode(etype.__name__),
343 'ename' : unicode(etype.__name__),
336 'evalue' : unicode(evalue),
344 'evalue' : unicode(evalue),
337 'engine_info' : engine_info
345 'engine_info' : engine_info
338 }
346 }
339 return exc_content
347 return exc_content
340
348
341 def unwrap_exception(content):
349 def unwrap_exception(content):
342 err = RemoteError(content['ename'], content['evalue'],
350 err = RemoteError(content['ename'], content['evalue'],
343 ''.join(content['traceback']),
351 ''.join(content['traceback']),
344 content.get('engine_info', {}))
352 content.get('engine_info', {}))
345 return err
353 return err
346
354
@@ -1,759 +1,786 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import platform
20 import platform
21 import time
21 import time
22 from collections import namedtuple
22 from collections import namedtuple
23 from tempfile import mktemp
23 from tempfile import mktemp
24 from StringIO import StringIO
24 from StringIO import StringIO
25
25
26 import zmq
26 import zmq
27 from nose import SkipTest
27 from nose import SkipTest
28 from nose.plugins.attrib import attr
28 from nose.plugins.attrib import attr
29
29
30 from IPython.testing import decorators as dec
30 from IPython.testing import decorators as dec
31 from IPython.testing.ipunittest import ParametricTestCase
31 from IPython.testing.ipunittest import ParametricTestCase
32 from IPython.utils.io import capture_output
32 from IPython.utils.io import capture_output
33
33
34 from IPython import parallel as pmod
34 from IPython import parallel as pmod
35 from IPython.parallel import error
35 from IPython.parallel import error
36 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
36 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
37 from IPython.parallel import DirectView
37 from IPython.parallel import DirectView
38 from IPython.parallel.util import interactive
38 from IPython.parallel.util import interactive
39
39
40 from IPython.parallel.tests import add_engines
40 from IPython.parallel.tests import add_engines
41
41
42 from .clienttest import ClusterTestCase, crash, wait, skip_without
42 from .clienttest import ClusterTestCase, crash, wait, skip_without
43
43
44 def setup():
44 def setup():
45 add_engines(3, total=True)
45 add_engines(3, total=True)
46
46
47 point = namedtuple("point", "x y")
47 point = namedtuple("point", "x y")
48
48
49 class TestView(ClusterTestCase, ParametricTestCase):
49 class TestView(ClusterTestCase, ParametricTestCase):
50
50
51 def setUp(self):
51 def setUp(self):
52 # On Win XP, wait for resource cleanup, else parallel test group fails
52 # On Win XP, wait for resource cleanup, else parallel test group fails
53 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
53 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
54 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
54 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
55 time.sleep(2)
55 time.sleep(2)
56 super(TestView, self).setUp()
56 super(TestView, self).setUp()
57
57
58 @attr('crash')
58 @attr('crash')
59 def test_z_crash_mux(self):
59 def test_z_crash_mux(self):
60 """test graceful handling of engine death (direct)"""
60 """test graceful handling of engine death (direct)"""
61 # self.add_engines(1)
61 # self.add_engines(1)
62 eid = self.client.ids[-1]
62 eid = self.client.ids[-1]
63 ar = self.client[eid].apply_async(crash)
63 ar = self.client[eid].apply_async(crash)
64 self.assertRaisesRemote(error.EngineError, ar.get, 10)
64 self.assertRaisesRemote(error.EngineError, ar.get, 10)
65 eid = ar.engine_id
65 eid = ar.engine_id
66 tic = time.time()
66 tic = time.time()
67 while eid in self.client.ids and time.time()-tic < 5:
67 while eid in self.client.ids and time.time()-tic < 5:
68 time.sleep(.01)
68 time.sleep(.01)
69 self.client.spin()
69 self.client.spin()
70 self.assertFalse(eid in self.client.ids, "Engine should have died")
70 self.assertFalse(eid in self.client.ids, "Engine should have died")
71
71
72 def test_push_pull(self):
72 def test_push_pull(self):
73 """test pushing and pulling"""
73 """test pushing and pulling"""
74 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
74 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
75 t = self.client.ids[-1]
75 t = self.client.ids[-1]
76 v = self.client[t]
76 v = self.client[t]
77 push = v.push
77 push = v.push
78 pull = v.pull
78 pull = v.pull
79 v.block=True
79 v.block=True
80 nengines = len(self.client)
80 nengines = len(self.client)
81 push({'data':data})
81 push({'data':data})
82 d = pull('data')
82 d = pull('data')
83 self.assertEqual(d, data)
83 self.assertEqual(d, data)
84 self.client[:].push({'data':data})
84 self.client[:].push({'data':data})
85 d = self.client[:].pull('data', block=True)
85 d = self.client[:].pull('data', block=True)
86 self.assertEqual(d, nengines*[data])
86 self.assertEqual(d, nengines*[data])
87 ar = push({'data':data}, block=False)
87 ar = push({'data':data}, block=False)
88 self.assertTrue(isinstance(ar, AsyncResult))
88 self.assertTrue(isinstance(ar, AsyncResult))
89 r = ar.get()
89 r = ar.get()
90 ar = self.client[:].pull('data', block=False)
90 ar = self.client[:].pull('data', block=False)
91 self.assertTrue(isinstance(ar, AsyncResult))
91 self.assertTrue(isinstance(ar, AsyncResult))
92 r = ar.get()
92 r = ar.get()
93 self.assertEqual(r, nengines*[data])
93 self.assertEqual(r, nengines*[data])
94 self.client[:].push(dict(a=10,b=20))
94 self.client[:].push(dict(a=10,b=20))
95 r = self.client[:].pull(('a','b'), block=True)
95 r = self.client[:].pull(('a','b'), block=True)
96 self.assertEqual(r, nengines*[[10,20]])
96 self.assertEqual(r, nengines*[[10,20]])
97
97
98 def test_push_pull_function(self):
98 def test_push_pull_function(self):
99 "test pushing and pulling functions"
99 "test pushing and pulling functions"
100 def testf(x):
100 def testf(x):
101 return 2.0*x
101 return 2.0*x
102
102
103 t = self.client.ids[-1]
103 t = self.client.ids[-1]
104 v = self.client[t]
104 v = self.client[t]
105 v.block=True
105 v.block=True
106 push = v.push
106 push = v.push
107 pull = v.pull
107 pull = v.pull
108 execute = v.execute
108 execute = v.execute
109 push({'testf':testf})
109 push({'testf':testf})
110 r = pull('testf')
110 r = pull('testf')
111 self.assertEqual(r(1.0), testf(1.0))
111 self.assertEqual(r(1.0), testf(1.0))
112 execute('r = testf(10)')
112 execute('r = testf(10)')
113 r = pull('r')
113 r = pull('r')
114 self.assertEqual(r, testf(10))
114 self.assertEqual(r, testf(10))
115 ar = self.client[:].push({'testf':testf}, block=False)
115 ar = self.client[:].push({'testf':testf}, block=False)
116 ar.get()
116 ar.get()
117 ar = self.client[:].pull('testf', block=False)
117 ar = self.client[:].pull('testf', block=False)
118 rlist = ar.get()
118 rlist = ar.get()
119 for r in rlist:
119 for r in rlist:
120 self.assertEqual(r(1.0), testf(1.0))
120 self.assertEqual(r(1.0), testf(1.0))
121 execute("def g(x): return x*x")
121 execute("def g(x): return x*x")
122 r = pull(('testf','g'))
122 r = pull(('testf','g'))
123 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
123 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
124
124
125 def test_push_function_globals(self):
125 def test_push_function_globals(self):
126 """test that pushed functions have access to globals"""
126 """test that pushed functions have access to globals"""
127 @interactive
127 @interactive
128 def geta():
128 def geta():
129 return a
129 return a
130 # self.add_engines(1)
130 # self.add_engines(1)
131 v = self.client[-1]
131 v = self.client[-1]
132 v.block=True
132 v.block=True
133 v['f'] = geta
133 v['f'] = geta
134 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
134 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
135 v.execute('a=5')
135 v.execute('a=5')
136 v.execute('b=f()')
136 v.execute('b=f()')
137 self.assertEqual(v['b'], 5)
137 self.assertEqual(v['b'], 5)
138
138
139 def test_push_function_defaults(self):
139 def test_push_function_defaults(self):
140 """test that pushed functions preserve default args"""
140 """test that pushed functions preserve default args"""
141 def echo(a=10):
141 def echo(a=10):
142 return a
142 return a
143 v = self.client[-1]
143 v = self.client[-1]
144 v.block=True
144 v.block=True
145 v['f'] = echo
145 v['f'] = echo
146 v.execute('b=f()')
146 v.execute('b=f()')
147 self.assertEqual(v['b'], 10)
147 self.assertEqual(v['b'], 10)
148
148
149 def test_get_result(self):
149 def test_get_result(self):
150 """test getting results from the Hub."""
150 """test getting results from the Hub."""
151 c = pmod.Client(profile='iptest')
151 c = pmod.Client(profile='iptest')
152 # self.add_engines(1)
152 # self.add_engines(1)
153 t = c.ids[-1]
153 t = c.ids[-1]
154 v = c[t]
154 v = c[t]
155 v2 = self.client[t]
155 v2 = self.client[t]
156 ar = v.apply_async(wait, 1)
156 ar = v.apply_async(wait, 1)
157 # give the monitor time to notice the message
157 # give the monitor time to notice the message
158 time.sleep(.25)
158 time.sleep(.25)
159 ahr = v2.get_result(ar.msg_ids)
159 ahr = v2.get_result(ar.msg_ids)
160 self.assertTrue(isinstance(ahr, AsyncHubResult))
160 self.assertTrue(isinstance(ahr, AsyncHubResult))
161 self.assertEqual(ahr.get(), ar.get())
161 self.assertEqual(ahr.get(), ar.get())
162 ar2 = v2.get_result(ar.msg_ids)
162 ar2 = v2.get_result(ar.msg_ids)
163 self.assertFalse(isinstance(ar2, AsyncHubResult))
163 self.assertFalse(isinstance(ar2, AsyncHubResult))
164 c.spin()
164 c.spin()
165 c.close()
165 c.close()
166
166
167 def test_run_newline(self):
167 def test_run_newline(self):
168 """test that run appends newline to files"""
168 """test that run appends newline to files"""
169 tmpfile = mktemp()
169 tmpfile = mktemp()
170 with open(tmpfile, 'w') as f:
170 with open(tmpfile, 'w') as f:
171 f.write("""def g():
171 f.write("""def g():
172 return 5
172 return 5
173 """)
173 """)
174 v = self.client[-1]
174 v = self.client[-1]
175 v.run(tmpfile, block=True)
175 v.run(tmpfile, block=True)
176 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
176 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
177
177
178 def test_apply_tracked(self):
178 def test_apply_tracked(self):
179 """test tracking for apply"""
179 """test tracking for apply"""
180 # self.add_engines(1)
180 # self.add_engines(1)
181 t = self.client.ids[-1]
181 t = self.client.ids[-1]
182 v = self.client[t]
182 v = self.client[t]
183 v.block=False
183 v.block=False
184 def echo(n=1024*1024, **kwargs):
184 def echo(n=1024*1024, **kwargs):
185 with v.temp_flags(**kwargs):
185 with v.temp_flags(**kwargs):
186 return v.apply(lambda x: x, 'x'*n)
186 return v.apply(lambda x: x, 'x'*n)
187 ar = echo(1, track=False)
187 ar = echo(1, track=False)
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertTrue(ar.sent)
189 self.assertTrue(ar.sent)
190 ar = echo(track=True)
190 ar = echo(track=True)
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
191 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
192 self.assertEqual(ar.sent, ar._tracker.done)
192 self.assertEqual(ar.sent, ar._tracker.done)
193 ar._tracker.wait()
193 ar._tracker.wait()
194 self.assertTrue(ar.sent)
194 self.assertTrue(ar.sent)
195
195
196 def test_push_tracked(self):
196 def test_push_tracked(self):
197 t = self.client.ids[-1]
197 t = self.client.ids[-1]
198 ns = dict(x='x'*1024*1024)
198 ns = dict(x='x'*1024*1024)
199 v = self.client[t]
199 v = self.client[t]
200 ar = v.push(ns, block=False, track=False)
200 ar = v.push(ns, block=False, track=False)
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
201 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(ar.sent)
202 self.assertTrue(ar.sent)
203
203
204 ar = v.push(ns, block=False, track=True)
204 ar = v.push(ns, block=False, track=True)
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
205 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
206 ar._tracker.wait()
206 ar._tracker.wait()
207 self.assertEqual(ar.sent, ar._tracker.done)
207 self.assertEqual(ar.sent, ar._tracker.done)
208 self.assertTrue(ar.sent)
208 self.assertTrue(ar.sent)
209 ar.get()
209 ar.get()
210
210
211 def test_scatter_tracked(self):
211 def test_scatter_tracked(self):
212 t = self.client.ids
212 t = self.client.ids
213 x='x'*1024*1024
213 x='x'*1024*1024
214 ar = self.client[t].scatter('x', x, block=False, track=False)
214 ar = self.client[t].scatter('x', x, block=False, track=False)
215 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
215 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
216 self.assertTrue(ar.sent)
216 self.assertTrue(ar.sent)
217
217
218 ar = self.client[t].scatter('x', x, block=False, track=True)
218 ar = self.client[t].scatter('x', x, block=False, track=True)
219 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
219 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
220 self.assertEqual(ar.sent, ar._tracker.done)
220 self.assertEqual(ar.sent, ar._tracker.done)
221 ar._tracker.wait()
221 ar._tracker.wait()
222 self.assertTrue(ar.sent)
222 self.assertTrue(ar.sent)
223 ar.get()
223 ar.get()
224
224
225 def test_remote_reference(self):
225 def test_remote_reference(self):
226 v = self.client[-1]
226 v = self.client[-1]
227 v['a'] = 123
227 v['a'] = 123
228 ra = pmod.Reference('a')
228 ra = pmod.Reference('a')
229 b = v.apply_sync(lambda x: x, ra)
229 b = v.apply_sync(lambda x: x, ra)
230 self.assertEqual(b, 123)
230 self.assertEqual(b, 123)
231
231
232
232
233 def test_scatter_gather(self):
233 def test_scatter_gather(self):
234 view = self.client[:]
234 view = self.client[:]
235 seq1 = range(16)
235 seq1 = range(16)
236 view.scatter('a', seq1)
236 view.scatter('a', seq1)
237 seq2 = view.gather('a', block=True)
237 seq2 = view.gather('a', block=True)
238 self.assertEqual(seq2, seq1)
238 self.assertEqual(seq2, seq1)
239 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
239 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
240
240
241 @skip_without('numpy')
241 @skip_without('numpy')
242 def test_scatter_gather_numpy(self):
242 def test_scatter_gather_numpy(self):
243 import numpy
243 import numpy
244 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
244 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
245 view = self.client[:]
245 view = self.client[:]
246 a = numpy.arange(64)
246 a = numpy.arange(64)
247 view.scatter('a', a, block=True)
247 view.scatter('a', a, block=True)
248 b = view.gather('a', block=True)
248 b = view.gather('a', block=True)
249 assert_array_equal(b, a)
249 assert_array_equal(b, a)
250
250
251 def test_scatter_gather_lazy(self):
251 def test_scatter_gather_lazy(self):
252 """scatter/gather with targets='all'"""
252 """scatter/gather with targets='all'"""
253 view = self.client.direct_view(targets='all')
253 view = self.client.direct_view(targets='all')
254 x = range(64)
254 x = range(64)
255 view.scatter('x', x)
255 view.scatter('x', x)
256 gathered = view.gather('x', block=True)
256 gathered = view.gather('x', block=True)
257 self.assertEqual(gathered, x)
257 self.assertEqual(gathered, x)
258
258
259
259
260 @dec.known_failure_py3
260 @dec.known_failure_py3
261 @skip_without('numpy')
261 @skip_without('numpy')
262 def test_push_numpy_nocopy(self):
262 def test_push_numpy_nocopy(self):
263 import numpy
263 import numpy
264 view = self.client[:]
264 view = self.client[:]
265 a = numpy.arange(64)
265 a = numpy.arange(64)
266 view['A'] = a
266 view['A'] = a
267 @interactive
267 @interactive
268 def check_writeable(x):
268 def check_writeable(x):
269 return x.flags.writeable
269 return x.flags.writeable
270
270
271 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
271 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
272 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
272 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
273
273
274 view.push(dict(B=a))
274 view.push(dict(B=a))
275 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
275 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
276 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
276 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
277
277
278 @skip_without('numpy')
278 @skip_without('numpy')
279 def test_apply_numpy(self):
279 def test_apply_numpy(self):
280 """view.apply(f, ndarray)"""
280 """view.apply(f, ndarray)"""
281 import numpy
281 import numpy
282 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
282 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
283
283
284 A = numpy.random.random((100,100))
284 A = numpy.random.random((100,100))
285 view = self.client[-1]
285 view = self.client[-1]
286 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
286 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
287 B = A.astype(dt)
287 B = A.astype(dt)
288 C = view.apply_sync(lambda x:x, B)
288 C = view.apply_sync(lambda x:x, B)
289 assert_array_equal(B,C)
289 assert_array_equal(B,C)
290
290
291 @skip_without('numpy')
291 @skip_without('numpy')
292 def test_push_pull_recarray(self):
292 def test_push_pull_recarray(self):
293 """push/pull recarrays"""
293 """push/pull recarrays"""
294 import numpy
294 import numpy
295 from numpy.testing.utils import assert_array_equal
295 from numpy.testing.utils import assert_array_equal
296
296
297 view = self.client[-1]
297 view = self.client[-1]
298
298
299 R = numpy.array([
299 R = numpy.array([
300 (1, 'hi', 0.),
300 (1, 'hi', 0.),
301 (2**30, 'there', 2.5),
301 (2**30, 'there', 2.5),
302 (-99999, 'world', -12345.6789),
302 (-99999, 'world', -12345.6789),
303 ], [('n', int), ('s', '|S10'), ('f', float)])
303 ], [('n', int), ('s', '|S10'), ('f', float)])
304
304
305 view['RR'] = R
305 view['RR'] = R
306 R2 = view['RR']
306 R2 = view['RR']
307
307
308 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
308 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
309 self.assertEqual(r_dtype, R.dtype)
309 self.assertEqual(r_dtype, R.dtype)
310 self.assertEqual(r_shape, R.shape)
310 self.assertEqual(r_shape, R.shape)
311 self.assertEqual(R2.dtype, R.dtype)
311 self.assertEqual(R2.dtype, R.dtype)
312 self.assertEqual(R2.shape, R.shape)
312 self.assertEqual(R2.shape, R.shape)
313 assert_array_equal(R2, R)
313 assert_array_equal(R2, R)
314
314
315 @skip_without('pandas')
315 @skip_without('pandas')
316 def test_push_pull_timeseries(self):
316 def test_push_pull_timeseries(self):
317 """push/pull pandas.TimeSeries"""
317 """push/pull pandas.TimeSeries"""
318 import pandas
318 import pandas
319
319
320 ts = pandas.TimeSeries(range(10))
320 ts = pandas.TimeSeries(range(10))
321
321
322 view = self.client[-1]
322 view = self.client[-1]
323
323
324 view.push(dict(ts=ts), block=True)
324 view.push(dict(ts=ts), block=True)
325 rts = view['ts']
325 rts = view['ts']
326
326
327 self.assertEqual(type(rts), type(ts))
327 self.assertEqual(type(rts), type(ts))
328 self.assertTrue((ts == rts).all())
328 self.assertTrue((ts == rts).all())
329
329
330 def test_map(self):
330 def test_map(self):
331 view = self.client[:]
331 view = self.client[:]
332 def f(x):
332 def f(x):
333 return x**2
333 return x**2
334 data = range(16)
334 data = range(16)
335 r = view.map_sync(f, data)
335 r = view.map_sync(f, data)
336 self.assertEqual(r, map(f, data))
336 self.assertEqual(r, map(f, data))
337
337
338 def test_map_iterable(self):
338 def test_map_iterable(self):
339 """test map on iterables (direct)"""
339 """test map on iterables (direct)"""
340 view = self.client[:]
340 view = self.client[:]
341 # 101 is prime, so it won't be evenly distributed
341 # 101 is prime, so it won't be evenly distributed
342 arr = range(101)
342 arr = range(101)
343 # ensure it will be an iterator, even in Python 3
343 # ensure it will be an iterator, even in Python 3
344 it = iter(arr)
344 it = iter(arr)
345 r = view.map_sync(lambda x:x, arr)
345 r = view.map_sync(lambda x:x, arr)
346 self.assertEqual(r, list(arr))
346 self.assertEqual(r, list(arr))
347
347
348 def test_scatter_gather_nonblocking(self):
348 def test_scatter_gather_nonblocking(self):
349 data = range(16)
349 data = range(16)
350 view = self.client[:]
350 view = self.client[:]
351 view.scatter('a', data, block=False)
351 view.scatter('a', data, block=False)
352 ar = view.gather('a', block=False)
352 ar = view.gather('a', block=False)
353 self.assertEqual(ar.get(), data)
353 self.assertEqual(ar.get(), data)
354
354
355 @skip_without('numpy')
355 @skip_without('numpy')
356 def test_scatter_gather_numpy_nonblocking(self):
356 def test_scatter_gather_numpy_nonblocking(self):
357 import numpy
357 import numpy
358 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
358 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
359 a = numpy.arange(64)
359 a = numpy.arange(64)
360 view = self.client[:]
360 view = self.client[:]
361 ar = view.scatter('a', a, block=False)
361 ar = view.scatter('a', a, block=False)
362 self.assertTrue(isinstance(ar, AsyncResult))
362 self.assertTrue(isinstance(ar, AsyncResult))
363 amr = view.gather('a', block=False)
363 amr = view.gather('a', block=False)
364 self.assertTrue(isinstance(amr, AsyncMapResult))
364 self.assertTrue(isinstance(amr, AsyncMapResult))
365 assert_array_equal(amr.get(), a)
365 assert_array_equal(amr.get(), a)
366
366
367 def test_execute(self):
367 def test_execute(self):
368 view = self.client[:]
368 view = self.client[:]
369 # self.client.debug=True
369 # self.client.debug=True
370 execute = view.execute
370 execute = view.execute
371 ar = execute('c=30', block=False)
371 ar = execute('c=30', block=False)
372 self.assertTrue(isinstance(ar, AsyncResult))
372 self.assertTrue(isinstance(ar, AsyncResult))
373 ar = execute('d=[0,1,2]', block=False)
373 ar = execute('d=[0,1,2]', block=False)
374 self.client.wait(ar, 1)
374 self.client.wait(ar, 1)
375 self.assertEqual(len(ar.get()), len(self.client))
375 self.assertEqual(len(ar.get()), len(self.client))
376 for c in view['c']:
376 for c in view['c']:
377 self.assertEqual(c, 30)
377 self.assertEqual(c, 30)
378
378
379 def test_abort(self):
379 def test_abort(self):
380 view = self.client[-1]
380 view = self.client[-1]
381 ar = view.execute('import time; time.sleep(1)', block=False)
381 ar = view.execute('import time; time.sleep(1)', block=False)
382 ar2 = view.apply_async(lambda : 2)
382 ar2 = view.apply_async(lambda : 2)
383 ar3 = view.apply_async(lambda : 3)
383 ar3 = view.apply_async(lambda : 3)
384 view.abort(ar2)
384 view.abort(ar2)
385 view.abort(ar3.msg_ids)
385 view.abort(ar3.msg_ids)
386 self.assertRaises(error.TaskAborted, ar2.get)
386 self.assertRaises(error.TaskAborted, ar2.get)
387 self.assertRaises(error.TaskAborted, ar3.get)
387 self.assertRaises(error.TaskAborted, ar3.get)
388
388
389 def test_abort_all(self):
389 def test_abort_all(self):
390 """view.abort() aborts all outstanding tasks"""
390 """view.abort() aborts all outstanding tasks"""
391 view = self.client[-1]
391 view = self.client[-1]
392 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
392 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
393 view.abort()
393 view.abort()
394 view.wait(timeout=5)
394 view.wait(timeout=5)
395 for ar in ars[5:]:
395 for ar in ars[5:]:
396 self.assertRaises(error.TaskAborted, ar.get)
396 self.assertRaises(error.TaskAborted, ar.get)
397
397
398 def test_temp_flags(self):
398 def test_temp_flags(self):
399 view = self.client[-1]
399 view = self.client[-1]
400 view.block=True
400 view.block=True
401 with view.temp_flags(block=False):
401 with view.temp_flags(block=False):
402 self.assertFalse(view.block)
402 self.assertFalse(view.block)
403 self.assertTrue(view.block)
403 self.assertTrue(view.block)
404
404
405 @dec.known_failure_py3
405 @dec.known_failure_py3
406 def test_importer(self):
406 def test_importer(self):
407 view = self.client[-1]
407 view = self.client[-1]
408 view.clear(block=True)
408 view.clear(block=True)
409 with view.importer:
409 with view.importer:
410 import re
410 import re
411
411
412 @interactive
412 @interactive
413 def findall(pat, s):
413 def findall(pat, s):
414 # this globals() step isn't necessary in real code
414 # this globals() step isn't necessary in real code
415 # only to prevent a closure in the test
415 # only to prevent a closure in the test
416 re = globals()['re']
416 re = globals()['re']
417 return re.findall(pat, s)
417 return re.findall(pat, s)
418
418
419 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
419 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
420
420
421 def test_unicode_execute(self):
421 def test_unicode_execute(self):
422 """test executing unicode strings"""
422 """test executing unicode strings"""
423 v = self.client[-1]
423 v = self.client[-1]
424 v.block=True
424 v.block=True
425 if sys.version_info[0] >= 3:
425 if sys.version_info[0] >= 3:
426 code="a='é'"
426 code="a='é'"
427 else:
427 else:
428 code=u"a=u'é'"
428 code=u"a=u'é'"
429 v.execute(code)
429 v.execute(code)
430 self.assertEqual(v['a'], u'é')
430 self.assertEqual(v['a'], u'é')
431
431
432 def test_unicode_apply_result(self):
432 def test_unicode_apply_result(self):
433 """test unicode apply results"""
433 """test unicode apply results"""
434 v = self.client[-1]
434 v = self.client[-1]
435 r = v.apply_sync(lambda : u'é')
435 r = v.apply_sync(lambda : u'é')
436 self.assertEqual(r, u'é')
436 self.assertEqual(r, u'é')
437
437
438 def test_unicode_apply_arg(self):
438 def test_unicode_apply_arg(self):
439 """test passing unicode arguments to apply"""
439 """test passing unicode arguments to apply"""
440 v = self.client[-1]
440 v = self.client[-1]
441
441
442 @interactive
442 @interactive
443 def check_unicode(a, check):
443 def check_unicode(a, check):
444 assert isinstance(a, unicode), "%r is not unicode"%a
444 assert isinstance(a, unicode), "%r is not unicode"%a
445 assert isinstance(check, bytes), "%r is not bytes"%check
445 assert isinstance(check, bytes), "%r is not bytes"%check
446 assert a.encode('utf8') == check, "%s != %s"%(a,check)
446 assert a.encode('utf8') == check, "%s != %s"%(a,check)
447
447
448 for s in [ u'é', u'ßø®∫',u'asdf' ]:
448 for s in [ u'é', u'ßø®∫',u'asdf' ]:
449 try:
449 try:
450 v.apply_sync(check_unicode, s, s.encode('utf8'))
450 v.apply_sync(check_unicode, s, s.encode('utf8'))
451 except error.RemoteError as e:
451 except error.RemoteError as e:
452 if e.ename == 'AssertionError':
452 if e.ename == 'AssertionError':
453 self.fail(e.evalue)
453 self.fail(e.evalue)
454 else:
454 else:
455 raise e
455 raise e
456
456
457 def test_map_reference(self):
457 def test_map_reference(self):
458 """view.map(<Reference>, *seqs) should work"""
458 """view.map(<Reference>, *seqs) should work"""
459 v = self.client[:]
459 v = self.client[:]
460 v.scatter('n', self.client.ids, flatten=True)
460 v.scatter('n', self.client.ids, flatten=True)
461 v.execute("f = lambda x,y: x*y")
461 v.execute("f = lambda x,y: x*y")
462 rf = pmod.Reference('f')
462 rf = pmod.Reference('f')
463 nlist = list(range(10))
463 nlist = list(range(10))
464 mlist = nlist[::-1]
464 mlist = nlist[::-1]
465 expected = [ m*n for m,n in zip(mlist, nlist) ]
465 expected = [ m*n for m,n in zip(mlist, nlist) ]
466 result = v.map_sync(rf, mlist, nlist)
466 result = v.map_sync(rf, mlist, nlist)
467 self.assertEqual(result, expected)
467 self.assertEqual(result, expected)
468
468
469 def test_apply_reference(self):
469 def test_apply_reference(self):
470 """view.apply(<Reference>, *args) should work"""
470 """view.apply(<Reference>, *args) should work"""
471 v = self.client[:]
471 v = self.client[:]
472 v.scatter('n', self.client.ids, flatten=True)
472 v.scatter('n', self.client.ids, flatten=True)
473 v.execute("f = lambda x: n*x")
473 v.execute("f = lambda x: n*x")
474 rf = pmod.Reference('f')
474 rf = pmod.Reference('f')
475 result = v.apply_sync(rf, 5)
475 result = v.apply_sync(rf, 5)
476 expected = [ 5*id for id in self.client.ids ]
476 expected = [ 5*id for id in self.client.ids ]
477 self.assertEqual(result, expected)
477 self.assertEqual(result, expected)
478
478
479 def test_eval_reference(self):
479 def test_eval_reference(self):
480 v = self.client[self.client.ids[0]]
480 v = self.client[self.client.ids[0]]
481 v['g'] = range(5)
481 v['g'] = range(5)
482 rg = pmod.Reference('g[0]')
482 rg = pmod.Reference('g[0]')
483 echo = lambda x:x
483 echo = lambda x:x
484 self.assertEqual(v.apply_sync(echo, rg), 0)
484 self.assertEqual(v.apply_sync(echo, rg), 0)
485
485
486 def test_reference_nameerror(self):
486 def test_reference_nameerror(self):
487 v = self.client[self.client.ids[0]]
487 v = self.client[self.client.ids[0]]
488 r = pmod.Reference('elvis_has_left')
488 r = pmod.Reference('elvis_has_left')
489 echo = lambda x:x
489 echo = lambda x:x
490 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
490 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
491
491
492 def test_single_engine_map(self):
492 def test_single_engine_map(self):
493 e0 = self.client[self.client.ids[0]]
493 e0 = self.client[self.client.ids[0]]
494 r = range(5)
494 r = range(5)
495 check = [ -1*i for i in r ]
495 check = [ -1*i for i in r ]
496 result = e0.map_sync(lambda x: -1*x, r)
496 result = e0.map_sync(lambda x: -1*x, r)
497 self.assertEqual(result, check)
497 self.assertEqual(result, check)
498
498
499 def test_len(self):
499 def test_len(self):
500 """len(view) makes sense"""
500 """len(view) makes sense"""
501 e0 = self.client[self.client.ids[0]]
501 e0 = self.client[self.client.ids[0]]
502 yield self.assertEqual(len(e0), 1)
502 yield self.assertEqual(len(e0), 1)
503 v = self.client[:]
503 v = self.client[:]
504 yield self.assertEqual(len(v), len(self.client.ids))
504 yield self.assertEqual(len(v), len(self.client.ids))
505 v = self.client.direct_view('all')
505 v = self.client.direct_view('all')
506 yield self.assertEqual(len(v), len(self.client.ids))
506 yield self.assertEqual(len(v), len(self.client.ids))
507 v = self.client[:2]
507 v = self.client[:2]
508 yield self.assertEqual(len(v), 2)
508 yield self.assertEqual(len(v), 2)
509 v = self.client[:1]
509 v = self.client[:1]
510 yield self.assertEqual(len(v), 1)
510 yield self.assertEqual(len(v), 1)
511 v = self.client.load_balanced_view()
511 v = self.client.load_balanced_view()
512 yield self.assertEqual(len(v), len(self.client.ids))
512 yield self.assertEqual(len(v), len(self.client.ids))
513 # parametric tests seem to require manual closing?
513 # parametric tests seem to require manual closing?
514 self.client.close()
514 self.client.close()
515
515
516
516
517 # begin execute tests
517 # begin execute tests
518
518
519 def test_execute_reply(self):
519 def test_execute_reply(self):
520 e0 = self.client[self.client.ids[0]]
520 e0 = self.client[self.client.ids[0]]
521 e0.block = True
521 e0.block = True
522 ar = e0.execute("5", silent=False)
522 ar = e0.execute("5", silent=False)
523 er = ar.get()
523 er = ar.get()
524 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
524 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
525 self.assertEqual(er.pyout['data']['text/plain'], '5')
525 self.assertEqual(er.pyout['data']['text/plain'], '5')
526
526
527 def test_execute_reply_stdout(self):
527 def test_execute_reply_stdout(self):
528 e0 = self.client[self.client.ids[0]]
528 e0 = self.client[self.client.ids[0]]
529 e0.block = True
529 e0.block = True
530 ar = e0.execute("print (5)", silent=False)
530 ar = e0.execute("print (5)", silent=False)
531 er = ar.get()
531 er = ar.get()
532 self.assertEqual(er.stdout.strip(), '5')
532 self.assertEqual(er.stdout.strip(), '5')
533
533
534 def test_execute_pyout(self):
534 def test_execute_pyout(self):
535 """execute triggers pyout with silent=False"""
535 """execute triggers pyout with silent=False"""
536 view = self.client[:]
536 view = self.client[:]
537 ar = view.execute("5", silent=False, block=True)
537 ar = view.execute("5", silent=False, block=True)
538
538
539 expected = [{'text/plain' : '5'}] * len(view)
539 expected = [{'text/plain' : '5'}] * len(view)
540 mimes = [ out['data'] for out in ar.pyout ]
540 mimes = [ out['data'] for out in ar.pyout ]
541 self.assertEqual(mimes, expected)
541 self.assertEqual(mimes, expected)
542
542
543 def test_execute_silent(self):
543 def test_execute_silent(self):
544 """execute does not trigger pyout with silent=True"""
544 """execute does not trigger pyout with silent=True"""
545 view = self.client[:]
545 view = self.client[:]
546 ar = view.execute("5", block=True)
546 ar = view.execute("5", block=True)
547 expected = [None] * len(view)
547 expected = [None] * len(view)
548 self.assertEqual(ar.pyout, expected)
548 self.assertEqual(ar.pyout, expected)
549
549
550 def test_execute_magic(self):
550 def test_execute_magic(self):
551 """execute accepts IPython commands"""
551 """execute accepts IPython commands"""
552 view = self.client[:]
552 view = self.client[:]
553 view.execute("a = 5")
553 view.execute("a = 5")
554 ar = view.execute("%whos", block=True)
554 ar = view.execute("%whos", block=True)
555 # this will raise, if that failed
555 # this will raise, if that failed
556 ar.get(5)
556 ar.get(5)
557 for stdout in ar.stdout:
557 for stdout in ar.stdout:
558 lines = stdout.splitlines()
558 lines = stdout.splitlines()
559 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
559 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
560 found = False
560 found = False
561 for line in lines[2:]:
561 for line in lines[2:]:
562 split = line.split()
562 split = line.split()
563 if split == ['a', 'int', '5']:
563 if split == ['a', 'int', '5']:
564 found = True
564 found = True
565 break
565 break
566 self.assertTrue(found, "whos output wrong: %s" % stdout)
566 self.assertTrue(found, "whos output wrong: %s" % stdout)
567
567
568 def test_execute_displaypub(self):
568 def test_execute_displaypub(self):
569 """execute tracks display_pub output"""
569 """execute tracks display_pub output"""
570 view = self.client[:]
570 view = self.client[:]
571 view.execute("from IPython.core.display import *")
571 view.execute("from IPython.core.display import *")
572 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
572 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
573
573
574 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
574 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
575 for outputs in ar.outputs:
575 for outputs in ar.outputs:
576 mimes = [ out['data'] for out in outputs ]
576 mimes = [ out['data'] for out in outputs ]
577 self.assertEqual(mimes, expected)
577 self.assertEqual(mimes, expected)
578
578
579 def test_apply_displaypub(self):
579 def test_apply_displaypub(self):
580 """apply tracks display_pub output"""
580 """apply tracks display_pub output"""
581 view = self.client[:]
581 view = self.client[:]
582 view.execute("from IPython.core.display import *")
582 view.execute("from IPython.core.display import *")
583
583
584 @interactive
584 @interactive
585 def publish():
585 def publish():
586 [ display(i) for i in range(5) ]
586 [ display(i) for i in range(5) ]
587
587
588 ar = view.apply_async(publish)
588 ar = view.apply_async(publish)
589 ar.get(5)
589 ar.get(5)
590 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
590 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
591 for outputs in ar.outputs:
591 for outputs in ar.outputs:
592 mimes = [ out['data'] for out in outputs ]
592 mimes = [ out['data'] for out in outputs ]
593 self.assertEqual(mimes, expected)
593 self.assertEqual(mimes, expected)
594
594
595 def test_execute_raises(self):
595 def test_execute_raises(self):
596 """exceptions in execute requests raise appropriately"""
596 """exceptions in execute requests raise appropriately"""
597 view = self.client[-1]
597 view = self.client[-1]
598 ar = view.execute("1/0")
598 ar = view.execute("1/0")
599 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
599 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
600
600
601 def test_remoteerror_render_exception(self):
601 def test_remoteerror_render_exception(self):
602 """RemoteErrors get nice tracebacks"""
602 """RemoteErrors get nice tracebacks"""
603 view = self.client[-1]
603 view = self.client[-1]
604 ar = view.execute("1/0")
604 ar = view.execute("1/0")
605 ip = get_ipython()
605 ip = get_ipython()
606 ip.user_ns['ar'] = ar
606 ip.user_ns['ar'] = ar
607 with capture_output() as io:
607 with capture_output() as io:
608 ip.run_cell("ar.get(2)")
608 ip.run_cell("ar.get(2)")
609
609
610 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
610 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
611
611
612 def test_compositeerror_render_exception(self):
612 def test_compositeerror_render_exception(self):
613 """CompositeErrors get nice tracebacks"""
613 """CompositeErrors get nice tracebacks"""
614 view = self.client[:]
614 view = self.client[:]
615 ar = view.execute("1/0")
615 ar = view.execute("1/0")
616 ip = get_ipython()
616 ip = get_ipython()
617 ip.user_ns['ar'] = ar
617 ip.user_ns['ar'] = ar
618 with capture_output() as io:
618 with capture_output() as io:
619 ip.run_cell("ar.get(2)")
619 ip.run_cell("ar.get(2)")
620
620
621 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
621 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
622 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
622 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
623 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
623 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
624
624
625 def test_compositeerror_truncate(self):
626 """Truncate CompositeErrors with many exceptions"""
627 view = self.client[:]
628 msg_ids = []
629 for i in range(10):
630 ar = view.execute("1/0")
631 msg_ids.extend(ar.msg_ids)
632
633 ar = self.client.get_result(msg_ids)
634 try:
635 ar.get()
636 except error.CompositeError as e:
637 pass
638 else:
639 self.fail("Should have raised CompositeError")
640
641 lines = e.render_traceback()
642 with capture_output() as io:
643 e.print_traceback()
644
645 self.assertTrue("more exceptions" in lines[-1])
646 count = e.tb_limit
647
648 self.assertEqual(io.stdout.count('ZeroDivisionError'), 2 * count, io.stdout)
649 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
650 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
651
625 @dec.skipif_not_matplotlib
652 @dec.skipif_not_matplotlib
626 def test_magic_pylab(self):
653 def test_magic_pylab(self):
627 """%pylab works on engines"""
654 """%pylab works on engines"""
628 view = self.client[-1]
655 view = self.client[-1]
629 ar = view.execute("%pylab inline")
656 ar = view.execute("%pylab inline")
630 # at least check if this raised:
657 # at least check if this raised:
631 reply = ar.get(5)
658 reply = ar.get(5)
632 # include imports, in case user config
659 # include imports, in case user config
633 ar = view.execute("plot(rand(100))", silent=False)
660 ar = view.execute("plot(rand(100))", silent=False)
634 reply = ar.get(5)
661 reply = ar.get(5)
635 self.assertEqual(len(reply.outputs), 1)
662 self.assertEqual(len(reply.outputs), 1)
636 output = reply.outputs[0]
663 output = reply.outputs[0]
637 self.assertTrue("data" in output)
664 self.assertTrue("data" in output)
638 data = output['data']
665 data = output['data']
639 self.assertTrue("image/png" in data)
666 self.assertTrue("image/png" in data)
640
667
641 def test_func_default_func(self):
668 def test_func_default_func(self):
642 """interactively defined function as apply func default"""
669 """interactively defined function as apply func default"""
643 def foo():
670 def foo():
644 return 'foo'
671 return 'foo'
645
672
646 def bar(f=foo):
673 def bar(f=foo):
647 return f()
674 return f()
648
675
649 view = self.client[-1]
676 view = self.client[-1]
650 ar = view.apply_async(bar)
677 ar = view.apply_async(bar)
651 r = ar.get(10)
678 r = ar.get(10)
652 self.assertEqual(r, 'foo')
679 self.assertEqual(r, 'foo')
653 def test_data_pub_single(self):
680 def test_data_pub_single(self):
654 view = self.client[-1]
681 view = self.client[-1]
655 ar = view.execute('\n'.join([
682 ar = view.execute('\n'.join([
656 'from IPython.kernel.zmq.datapub import publish_data',
683 'from IPython.kernel.zmq.datapub import publish_data',
657 'for i in range(5):',
684 'for i in range(5):',
658 ' publish_data(dict(i=i))'
685 ' publish_data(dict(i=i))'
659 ]), block=False)
686 ]), block=False)
660 self.assertTrue(isinstance(ar.data, dict))
687 self.assertTrue(isinstance(ar.data, dict))
661 ar.get(5)
688 ar.get(5)
662 self.assertEqual(ar.data, dict(i=4))
689 self.assertEqual(ar.data, dict(i=4))
663
690
664 def test_data_pub(self):
691 def test_data_pub(self):
665 view = self.client[:]
692 view = self.client[:]
666 ar = view.execute('\n'.join([
693 ar = view.execute('\n'.join([
667 'from IPython.kernel.zmq.datapub import publish_data',
694 'from IPython.kernel.zmq.datapub import publish_data',
668 'for i in range(5):',
695 'for i in range(5):',
669 ' publish_data(dict(i=i))'
696 ' publish_data(dict(i=i))'
670 ]), block=False)
697 ]), block=False)
671 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
698 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
672 ar.get(5)
699 ar.get(5)
673 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
700 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
674
701
675 def test_can_list_arg(self):
702 def test_can_list_arg(self):
676 """args in lists are canned"""
703 """args in lists are canned"""
677 view = self.client[-1]
704 view = self.client[-1]
678 view['a'] = 128
705 view['a'] = 128
679 rA = pmod.Reference('a')
706 rA = pmod.Reference('a')
680 ar = view.apply_async(lambda x: x, [rA])
707 ar = view.apply_async(lambda x: x, [rA])
681 r = ar.get(5)
708 r = ar.get(5)
682 self.assertEqual(r, [128])
709 self.assertEqual(r, [128])
683
710
684 def test_can_dict_arg(self):
711 def test_can_dict_arg(self):
685 """args in dicts are canned"""
712 """args in dicts are canned"""
686 view = self.client[-1]
713 view = self.client[-1]
687 view['a'] = 128
714 view['a'] = 128
688 rA = pmod.Reference('a')
715 rA = pmod.Reference('a')
689 ar = view.apply_async(lambda x: x, dict(foo=rA))
716 ar = view.apply_async(lambda x: x, dict(foo=rA))
690 r = ar.get(5)
717 r = ar.get(5)
691 self.assertEqual(r, dict(foo=128))
718 self.assertEqual(r, dict(foo=128))
692
719
693 def test_can_list_kwarg(self):
720 def test_can_list_kwarg(self):
694 """kwargs in lists are canned"""
721 """kwargs in lists are canned"""
695 view = self.client[-1]
722 view = self.client[-1]
696 view['a'] = 128
723 view['a'] = 128
697 rA = pmod.Reference('a')
724 rA = pmod.Reference('a')
698 ar = view.apply_async(lambda x=5: x, x=[rA])
725 ar = view.apply_async(lambda x=5: x, x=[rA])
699 r = ar.get(5)
726 r = ar.get(5)
700 self.assertEqual(r, [128])
727 self.assertEqual(r, [128])
701
728
702 def test_can_dict_kwarg(self):
729 def test_can_dict_kwarg(self):
703 """kwargs in dicts are canned"""
730 """kwargs in dicts are canned"""
704 view = self.client[-1]
731 view = self.client[-1]
705 view['a'] = 128
732 view['a'] = 128
706 rA = pmod.Reference('a')
733 rA = pmod.Reference('a')
707 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
734 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
708 r = ar.get(5)
735 r = ar.get(5)
709 self.assertEqual(r, dict(foo=128))
736 self.assertEqual(r, dict(foo=128))
710
737
711 def test_map_ref(self):
738 def test_map_ref(self):
712 """view.map works with references"""
739 """view.map works with references"""
713 view = self.client[:]
740 view = self.client[:]
714 ranks = sorted(self.client.ids)
741 ranks = sorted(self.client.ids)
715 view.scatter('rank', ranks, flatten=True)
742 view.scatter('rank', ranks, flatten=True)
716 rrank = pmod.Reference('rank')
743 rrank = pmod.Reference('rank')
717
744
718 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
745 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
719 drank = amr.get(5)
746 drank = amr.get(5)
720 self.assertEqual(drank, [ r*2 for r in ranks ])
747 self.assertEqual(drank, [ r*2 for r in ranks ])
721
748
722 def test_nested_getitem_setitem(self):
749 def test_nested_getitem_setitem(self):
723 """get and set with view['a.b']"""
750 """get and set with view['a.b']"""
724 view = self.client[-1]
751 view = self.client[-1]
725 view.execute('\n'.join([
752 view.execute('\n'.join([
726 'class A(object): pass',
753 'class A(object): pass',
727 'a = A()',
754 'a = A()',
728 'a.b = 128',
755 'a.b = 128',
729 ]), block=True)
756 ]), block=True)
730 ra = pmod.Reference('a')
757 ra = pmod.Reference('a')
731
758
732 r = view.apply_sync(lambda x: x.b, ra)
759 r = view.apply_sync(lambda x: x.b, ra)
733 self.assertEqual(r, 128)
760 self.assertEqual(r, 128)
734 self.assertEqual(view['a.b'], 128)
761 self.assertEqual(view['a.b'], 128)
735
762
736 view['a.b'] = 0
763 view['a.b'] = 0
737
764
738 r = view.apply_sync(lambda x: x.b, ra)
765 r = view.apply_sync(lambda x: x.b, ra)
739 self.assertEqual(r, 0)
766 self.assertEqual(r, 0)
740 self.assertEqual(view['a.b'], 0)
767 self.assertEqual(view['a.b'], 0)
741
768
742 def test_return_namedtuple(self):
769 def test_return_namedtuple(self):
743 def namedtuplify(x, y):
770 def namedtuplify(x, y):
744 from IPython.parallel.tests.test_view import point
771 from IPython.parallel.tests.test_view import point
745 return point(x, y)
772 return point(x, y)
746
773
747 view = self.client[-1]
774 view = self.client[-1]
748 p = view.apply_sync(namedtuplify, 1, 2)
775 p = view.apply_sync(namedtuplify, 1, 2)
749 self.assertEqual(p.x, 1)
776 self.assertEqual(p.x, 1)
750 self.assertEqual(p.y, 2)
777 self.assertEqual(p.y, 2)
751
778
752 def test_apply_namedtuple(self):
779 def test_apply_namedtuple(self):
753 def echoxy(p):
780 def echoxy(p):
754 return p.y, p.x
781 return p.y, p.x
755
782
756 view = self.client[-1]
783 view = self.client[-1]
757 tup = view.apply_sync(echoxy, point(1, 2))
784 tup = view.apply_sync(echoxy, point(1, 2))
758 self.assertEqual(tup, (2,1))
785 self.assertEqual(tup, (2,1))
759
786
@@ -1,694 +1,697 b''
1 .. _parallel_multiengine:
1 .. _parallel_multiengine:
2
2
3 ==========================
3 ==========================
4 IPython's Direct interface
4 IPython's Direct interface
5 ==========================
5 ==========================
6
6
7 The direct, or multiengine, interface represents one possible way of working with a set of
7 The direct, or multiengine, interface represents one possible way of working with a set of
8 IPython engines. The basic idea behind the multiengine interface is that the
8 IPython engines. The basic idea behind the multiengine interface is that the
9 capabilities of each engine are directly and explicitly exposed to the user.
9 capabilities of each engine are directly and explicitly exposed to the user.
10 Thus, in the multiengine interface, each engine is given an id that is used to
10 Thus, in the multiengine interface, each engine is given an id that is used to
11 identify the engine and give it work to do. This interface is very intuitive
11 identify the engine and give it work to do. This interface is very intuitive
12 and is designed with interactive usage in mind, and is the best place for
12 and is designed with interactive usage in mind, and is the best place for
13 new users of IPython to begin.
13 new users of IPython to begin.
14
14
15 Starting the IPython controller and engines
15 Starting the IPython controller and engines
16 ===========================================
16 ===========================================
17
17
18 To follow along with this tutorial, you will need to start the IPython
18 To follow along with this tutorial, you will need to start the IPython
19 controller and four IPython engines. The simplest way of doing this is to use
19 controller and four IPython engines. The simplest way of doing this is to use
20 the :command:`ipcluster` command::
20 the :command:`ipcluster` command::
21
21
22 $ ipcluster start -n 4
22 $ ipcluster start -n 4
23
23
24 For more detailed information about starting the controller and engines, see
24 For more detailed information about starting the controller and engines, see
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
25 our :ref:`introduction <parallel_overview>` to using IPython for parallel computing.
26
26
27 Creating a ``DirectView`` instance
27 Creating a ``DirectView`` instance
28 ==================================
28 ==================================
29
29
30 The first step is to import the IPython :mod:`IPython.parallel`
30 The first step is to import the IPython :mod:`IPython.parallel`
31 module and then create a :class:`.Client` instance:
31 module and then create a :class:`.Client` instance:
32
32
33 .. sourcecode:: ipython
33 .. sourcecode:: ipython
34
34
35 In [1]: from IPython.parallel import Client
35 In [1]: from IPython.parallel import Client
36
36
37 In [2]: rc = Client()
37 In [2]: rc = Client()
38
38
39 This form assumes that the default connection information (stored in
39 This form assumes that the default connection information (stored in
40 :file:`ipcontroller-client.json` found in :file:`IPYTHONDIR/profile_default/security`) is
40 :file:`ipcontroller-client.json` found in :file:`IPYTHONDIR/profile_default/security`) is
41 accurate. If the controller was started on a remote machine, you must copy that connection
41 accurate. If the controller was started on a remote machine, you must copy that connection
42 file to the client machine, or enter its contents as arguments to the Client constructor:
42 file to the client machine, or enter its contents as arguments to the Client constructor:
43
43
44 .. sourcecode:: ipython
44 .. sourcecode:: ipython
45
45
46 # If you have copied the json connector file from the controller:
46 # If you have copied the json connector file from the controller:
47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
47 In [2]: rc = Client('/path/to/ipcontroller-client.json')
48 # or to connect with a specific profile you have set up:
48 # or to connect with a specific profile you have set up:
49 In [3]: rc = Client(profile='mpi')
49 In [3]: rc = Client(profile='mpi')
50
50
51
51
52 To make sure there are engines connected to the controller, users can get a list
52 To make sure there are engines connected to the controller, users can get a list
53 of engine ids:
53 of engine ids:
54
54
55 .. sourcecode:: ipython
55 .. sourcecode:: ipython
56
56
57 In [3]: rc.ids
57 In [3]: rc.ids
58 Out[3]: [0, 1, 2, 3]
58 Out[3]: [0, 1, 2, 3]
59
59
60 Here we see that there are four engines ready to do work for us.
60 Here we see that there are four engines ready to do work for us.
61
61
62 For direct execution, we will make use of a :class:`DirectView` object, which can be
62 For direct execution, we will make use of a :class:`DirectView` object, which can be
63 constructed via list-access to the client:
63 constructed via list-access to the client:
64
64
65 .. sourcecode:: ipython
65 .. sourcecode:: ipython
66
66
67 In [4]: dview = rc[:] # use all engines
67 In [4]: dview = rc[:] # use all engines
68
68
69 .. seealso::
69 .. seealso::
70
70
71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
71 For more information, see the in-depth explanation of :ref:`Views <parallel_details>`.
72
72
73
73
74 Quick and easy parallelism
74 Quick and easy parallelism
75 ==========================
75 ==========================
76
76
77 In many cases, you simply want to apply a Python function to a sequence of
77 In many cases, you simply want to apply a Python function to a sequence of
78 objects, but *in parallel*. The client interface provides a simple way
78 objects, but *in parallel*. The client interface provides a simple way
79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
79 of accomplishing this: using the DirectView's :meth:`~DirectView.map` method.
80
80
81 Parallel map
81 Parallel map
82 ------------
82 ------------
83
83
84 Python's builtin :func:`map` functions allows a function to be applied to a
84 Python's builtin :func:`map` functions allows a function to be applied to a
85 sequence element-by-element. This type of code is typically trivial to
85 sequence element-by-element. This type of code is typically trivial to
86 parallelize. In fact, since IPython's interface is all about functions anyway,
86 parallelize. In fact, since IPython's interface is all about functions anyway,
87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
87 you can just use the builtin :func:`map` with a :class:`RemoteFunction`, or a
88 DirectView's :meth:`map` method:
88 DirectView's :meth:`map` method:
89
89
90 .. sourcecode:: ipython
90 .. sourcecode:: ipython
91
91
92 In [62]: serial_result = map(lambda x:x**10, range(32))
92 In [62]: serial_result = map(lambda x:x**10, range(32))
93
93
94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
94 In [63]: parallel_result = dview.map_sync(lambda x: x**10, range(32))
95
95
96 In [67]: serial_result==parallel_result
96 In [67]: serial_result==parallel_result
97 Out[67]: True
97 Out[67]: True
98
98
99
99
100 .. note::
100 .. note::
101
101
102 The :class:`DirectView`'s version of :meth:`map` does
102 The :class:`DirectView`'s version of :meth:`map` does
103 not do dynamic load balancing. For a load balanced version, use a
103 not do dynamic load balancing. For a load balanced version, use a
104 :class:`LoadBalancedView`.
104 :class:`LoadBalancedView`.
105
105
106 .. seealso::
106 .. seealso::
107
107
108 :meth:`map` is implemented via :class:`ParallelFunction`.
108 :meth:`map` is implemented via :class:`ParallelFunction`.
109
109
110 Remote function decorators
110 Remote function decorators
111 --------------------------
111 --------------------------
112
112
113 Remote functions are just like normal functions, but when they are called,
113 Remote functions are just like normal functions, but when they are called,
114 they execute on one or more engines, rather than locally. IPython provides
114 they execute on one or more engines, rather than locally. IPython provides
115 two decorators:
115 two decorators:
116
116
117 .. sourcecode:: ipython
117 .. sourcecode:: ipython
118
118
119 In [10]: @dview.remote(block=True)
119 In [10]: @dview.remote(block=True)
120 ....: def getpid():
120 ....: def getpid():
121 ....: import os
121 ....: import os
122 ....: return os.getpid()
122 ....: return os.getpid()
123 ....:
123 ....:
124
124
125 In [11]: getpid()
125 In [11]: getpid()
126 Out[11]: [12345, 12346, 12347, 12348]
126 Out[11]: [12345, 12346, 12347, 12348]
127
127
128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
128 The ``@parallel`` decorator creates parallel functions, that break up an element-wise
129 operations and distribute them, reconstructing the result.
129 operations and distribute them, reconstructing the result.
130
130
131 .. sourcecode:: ipython
131 .. sourcecode:: ipython
132
132
133 In [12]: import numpy as np
133 In [12]: import numpy as np
134
134
135 In [13]: A = np.random.random((64,48))
135 In [13]: A = np.random.random((64,48))
136
136
137 In [14]: @dview.parallel(block=True)
137 In [14]: @dview.parallel(block=True)
138 ....: def pmul(A,B):
138 ....: def pmul(A,B):
139 ....: return A*B
139 ....: return A*B
140
140
141 In [15]: C_local = A*A
141 In [15]: C_local = A*A
142
142
143 In [16]: C_remote = pmul(A,A)
143 In [16]: C_remote = pmul(A,A)
144
144
145 In [17]: (C_local == C_remote).all()
145 In [17]: (C_local == C_remote).all()
146 Out[17]: True
146 Out[17]: True
147
147
148 Calling a ``@parallel`` function *does not* correspond to map. It is used for splitting
148 Calling a ``@parallel`` function *does not* correspond to map. It is used for splitting
149 element-wise operations that operate on a sequence or array. For ``map`` behavior,
149 element-wise operations that operate on a sequence or array. For ``map`` behavior,
150 parallel functions do have a map method.
150 parallel functions do have a map method.
151
151
152 ==================== ============================ =============================
152 ==================== ============================ =============================
153 call pfunc(seq) pfunc.map(seq)
153 call pfunc(seq) pfunc.map(seq)
154 ==================== ============================ =============================
154 ==================== ============================ =============================
155 # of tasks # of engines (1 per engine) # of engines (1 per engine)
155 # of tasks # of engines (1 per engine) # of engines (1 per engine)
156 # of remote calls # of engines (1 per engine) ``len(seq)``
156 # of remote calls # of engines (1 per engine) ``len(seq)``
157 argument to remote ``seq[i:j]`` (sub-sequence) ``seq[i]`` (single element)
157 argument to remote ``seq[i:j]`` (sub-sequence) ``seq[i]`` (single element)
158 ==================== ============================ =============================
158 ==================== ============================ =============================
159
159
160 A quick example to illustrate the difference in arguments for the two modes:
160 A quick example to illustrate the difference in arguments for the two modes:
161
161
162 .. sourcecode:: ipython
162 .. sourcecode:: ipython
163
163
164 In [16]: @dview.parallel(block=True)
164 In [16]: @dview.parallel(block=True)
165 ....: def echo(x):
165 ....: def echo(x):
166 ....: return str(x)
166 ....: return str(x)
167 ....:
167 ....:
168
168
169 In [17]: echo(range(5))
169 In [17]: echo(range(5))
170 Out[17]: ['[0, 1]', '[2]', '[3]', '[4]']
170 Out[17]: ['[0, 1]', '[2]', '[3]', '[4]']
171
171
172 In [18]: echo.map(range(5))
172 In [18]: echo.map(range(5))
173 Out[18]: ['0', '1', '2', '3', '4']
173 Out[18]: ['0', '1', '2', '3', '4']
174
174
175
175
176 .. seealso::
176 .. seealso::
177
177
178 See the :func:`~.remotefunction.parallel` and :func:`~.remotefunction.remote`
178 See the :func:`~.remotefunction.parallel` and :func:`~.remotefunction.remote`
179 decorators for options.
179 decorators for options.
180
180
181 Calling Python functions
181 Calling Python functions
182 ========================
182 ========================
183
183
184 The most basic type of operation that can be performed on the engines is to
184 The most basic type of operation that can be performed on the engines is to
185 execute Python code or call Python functions. Executing Python code can be
185 execute Python code or call Python functions. Executing Python code can be
186 done in blocking or non-blocking mode (non-blocking is default) using the
186 done in blocking or non-blocking mode (non-blocking is default) using the
187 :meth:`.View.execute` method, and calling functions can be done via the
187 :meth:`.View.execute` method, and calling functions can be done via the
188 :meth:`.View.apply` method.
188 :meth:`.View.apply` method.
189
189
190 apply
190 apply
191 -----
191 -----
192
192
193 The main method for doing remote execution (in fact, all methods that
193 The main method for doing remote execution (in fact, all methods that
194 communicate with the engines are built on top of it), is :meth:`View.apply`.
194 communicate with the engines are built on top of it), is :meth:`View.apply`.
195
195
196 We strive to provide the cleanest interface we can, so `apply` has the following
196 We strive to provide the cleanest interface we can, so `apply` has the following
197 signature:
197 signature:
198
198
199 .. sourcecode:: python
199 .. sourcecode:: python
200
200
201 view.apply(f, *args, **kwargs)
201 view.apply(f, *args, **kwargs)
202
202
203 There are various ways to call functions with IPython, and these flags are set as
203 There are various ways to call functions with IPython, and these flags are set as
204 attributes of the View. The ``DirectView`` has just two of these flags:
204 attributes of the View. The ``DirectView`` has just two of these flags:
205
205
206 dv.block : bool
206 dv.block : bool
207 whether to wait for the result, or return an :class:`AsyncResult` object
207 whether to wait for the result, or return an :class:`AsyncResult` object
208 immediately
208 immediately
209 dv.track : bool
209 dv.track : bool
210 whether to instruct pyzmq to track when zeromq is done sending the message.
210 whether to instruct pyzmq to track when zeromq is done sending the message.
211 This is primarily useful for non-copying sends of numpy arrays that you plan to
211 This is primarily useful for non-copying sends of numpy arrays that you plan to
212 edit in-place. You need to know when it becomes safe to edit the buffer
212 edit in-place. You need to know when it becomes safe to edit the buffer
213 without corrupting the message.
213 without corrupting the message.
214 dv.targets : int, list of ints
214 dv.targets : int, list of ints
215 which targets this view is associated with.
215 which targets this view is associated with.
216
216
217
217
218 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
218 Creating a view is simple: index-access on a client creates a :class:`.DirectView`.
219
219
220 .. sourcecode:: ipython
220 .. sourcecode:: ipython
221
221
222 In [4]: view = rc[1:3]
222 In [4]: view = rc[1:3]
223 Out[4]: <DirectView [1, 2]>
223 Out[4]: <DirectView [1, 2]>
224
224
225 In [5]: view.apply<tab>
225 In [5]: view.apply<tab>
226 view.apply view.apply_async view.apply_sync
226 view.apply view.apply_async view.apply_sync
227
227
228 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
228 For convenience, you can set block temporarily for a single call with the extra sync/async methods.
229
229
230 Blocking execution
230 Blocking execution
231 ------------------
231 ------------------
232
232
233 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
233 In blocking mode, the :class:`.DirectView` object (called ``dview`` in
234 these examples) submits the command to the controller, which places the
234 these examples) submits the command to the controller, which places the
235 command in the engines' queues for execution. The :meth:`apply` call then
235 command in the engines' queues for execution. The :meth:`apply` call then
236 blocks until the engines are done executing the command:
236 blocks until the engines are done executing the command:
237
237
238 .. sourcecode:: ipython
238 .. sourcecode:: ipython
239
239
240 In [2]: dview = rc[:] # A DirectView of all engines
240 In [2]: dview = rc[:] # A DirectView of all engines
241 In [3]: dview.block=True
241 In [3]: dview.block=True
242 In [4]: dview['a'] = 5
242 In [4]: dview['a'] = 5
243
243
244 In [5]: dview['b'] = 10
244 In [5]: dview['b'] = 10
245
245
246 In [6]: dview.apply(lambda x: a+b+x, 27)
246 In [6]: dview.apply(lambda x: a+b+x, 27)
247 Out[6]: [42, 42, 42, 42]
247 Out[6]: [42, 42, 42, 42]
248
248
249 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
249 You can also select blocking execution on a call-by-call basis with the :meth:`apply_sync`
250 method:
250 method:
251
251
252 .. sourcecode:: ipython
252 .. sourcecode:: ipython
253
253
254 In [7]: dview.block=False
254 In [7]: dview.block=False
255
255
256 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
256 In [8]: dview.apply_sync(lambda x: a+b+x, 27)
257 Out[8]: [42, 42, 42, 42]
257 Out[8]: [42, 42, 42, 42]
258
258
259 Python commands can be executed as strings on specific engines by using a View's ``execute``
259 Python commands can be executed as strings on specific engines by using a View's ``execute``
260 method:
260 method:
261
261
262 .. sourcecode:: ipython
262 .. sourcecode:: ipython
263
263
264 In [6]: rc[::2].execute('c=a+b')
264 In [6]: rc[::2].execute('c=a+b')
265
265
266 In [7]: rc[1::2].execute('c=a-b')
266 In [7]: rc[1::2].execute('c=a-b')
267
267
268 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
268 In [8]: dview['c'] # shorthand for dview.pull('c', block=True)
269 Out[8]: [15, -5, 15, -5]
269 Out[8]: [15, -5, 15, -5]
270
270
271
271
272 Non-blocking execution
272 Non-blocking execution
273 ----------------------
273 ----------------------
274
274
275 In non-blocking mode, :meth:`apply` submits the command to be executed and
275 In non-blocking mode, :meth:`apply` submits the command to be executed and
276 then returns a :class:`AsyncResult` object immediately. The
276 then returns a :class:`AsyncResult` object immediately. The
277 :class:`AsyncResult` object gives you a way of getting a result at a later
277 :class:`AsyncResult` object gives you a way of getting a result at a later
278 time through its :meth:`get` method.
278 time through its :meth:`get` method.
279
279
280 .. seealso::
280 .. seealso::
281
281
282 Docs on the :ref:`AsyncResult <parallel_asyncresult>` object.
282 Docs on the :ref:`AsyncResult <parallel_asyncresult>` object.
283
283
284 This allows you to quickly submit long running commands without blocking your
284 This allows you to quickly submit long running commands without blocking your
285 local Python/IPython session:
285 local Python/IPython session:
286
286
287 .. sourcecode:: ipython
287 .. sourcecode:: ipython
288
288
289 # define our function
289 # define our function
290 In [6]: def wait(t):
290 In [6]: def wait(t):
291 ....: import time
291 ....: import time
292 ....: tic = time.time()
292 ....: tic = time.time()
293 ....: time.sleep(t)
293 ....: time.sleep(t)
294 ....: return time.time()-tic
294 ....: return time.time()-tic
295
295
296 # In non-blocking mode
296 # In non-blocking mode
297 In [7]: ar = dview.apply_async(wait, 2)
297 In [7]: ar = dview.apply_async(wait, 2)
298
298
299 # Now block for the result
299 # Now block for the result
300 In [8]: ar.get()
300 In [8]: ar.get()
301 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
301 Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154]
302
302
303 # Again in non-blocking mode
303 # Again in non-blocking mode
304 In [9]: ar = dview.apply_async(wait, 10)
304 In [9]: ar = dview.apply_async(wait, 10)
305
305
306 # Poll to see if the result is ready
306 # Poll to see if the result is ready
307 In [10]: ar.ready()
307 In [10]: ar.ready()
308 Out[10]: False
308 Out[10]: False
309
309
310 # ask for the result, but wait a maximum of 1 second:
310 # ask for the result, but wait a maximum of 1 second:
311 In [45]: ar.get(1)
311 In [45]: ar.get(1)
312 ---------------------------------------------------------------------------
312 ---------------------------------------------------------------------------
313 TimeoutError Traceback (most recent call last)
313 TimeoutError Traceback (most recent call last)
314 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
314 /home/you/<ipython-input-45-7cd858bbb8e0> in <module>()
315 ----> 1 ar.get(1)
315 ----> 1 ar.get(1)
316
316
317 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
317 /path/to/site-packages/IPython/parallel/asyncresult.pyc in get(self, timeout)
318 62 raise self._exception
318 62 raise self._exception
319 63 else:
319 63 else:
320 ---> 64 raise error.TimeoutError("Result not ready.")
320 ---> 64 raise error.TimeoutError("Result not ready.")
321 65
321 65
322 66 def ready(self):
322 66 def ready(self):
323
323
324 TimeoutError: Result not ready.
324 TimeoutError: Result not ready.
325
325
326 .. Note::
326 .. Note::
327
327
328 Note the import inside the function. This is a common model, to ensure
328 Note the import inside the function. This is a common model, to ensure
329 that the appropriate modules are imported where the task is run. You can
329 that the appropriate modules are imported where the task is run. You can
330 also manually import modules into the engine(s) namespace(s) via
330 also manually import modules into the engine(s) namespace(s) via
331 :meth:`view.execute('import numpy')`.
331 :meth:`view.execute('import numpy')`.
332
332
333 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
333 Often, it is desirable to wait until a set of :class:`AsyncResult` objects
334 are done. For this, there is a the method :meth:`wait`. This method takes a
334 are done. For this, there is a the method :meth:`wait`. This method takes a
335 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
335 tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History),
336 and blocks until all of the associated results are ready:
336 and blocks until all of the associated results are ready:
337
337
338 .. sourcecode:: ipython
338 .. sourcecode:: ipython
339
339
340 In [72]: dview.block=False
340 In [72]: dview.block=False
341
341
342 # A trivial list of AsyncResults objects
342 # A trivial list of AsyncResults objects
343 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
343 In [73]: pr_list = [dview.apply_async(wait, 3) for i in range(10)]
344
344
345 # Wait until all of them are done
345 # Wait until all of them are done
346 In [74]: dview.wait(pr_list)
346 In [74]: dview.wait(pr_list)
347
347
348 # Then, their results are ready using get() or the `.r` attribute
348 # Then, their results are ready using get() or the `.r` attribute
349 In [75]: pr_list[0].get()
349 In [75]: pr_list[0].get()
350 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
350 Out[75]: [2.9982571601867676, 2.9982588291168213, 2.9987530708312988, 2.9990990161895752]
351
351
352
352
353
353
354 The ``block`` and ``targets`` keyword arguments and attributes
354 The ``block`` and ``targets`` keyword arguments and attributes
355 --------------------------------------------------------------
355 --------------------------------------------------------------
356
356
357 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
357 Most DirectView methods (excluding :meth:`apply`) accept ``block`` and
358 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
358 ``targets`` as keyword arguments. As we have seen above, these keyword arguments control the
359 blocking mode and which engines the command is applied to. The :class:`View` class also has
359 blocking mode and which engines the command is applied to. The :class:`View` class also has
360 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
360 :attr:`block` and :attr:`targets` attributes that control the default behavior when the keyword
361 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
361 arguments are not provided. Thus the following logic is used for :attr:`block` and :attr:`targets`:
362
362
363 * If no keyword argument is provided, the instance attributes are used.
363 * If no keyword argument is provided, the instance attributes are used.
364 * The Keyword arguments, if provided overrides the instance attributes for
364 * The Keyword arguments, if provided overrides the instance attributes for
365 the duration of a single call.
365 the duration of a single call.
366
366
367 The following examples demonstrate how to use the instance attributes:
367 The following examples demonstrate how to use the instance attributes:
368
368
369 .. sourcecode:: ipython
369 .. sourcecode:: ipython
370
370
371 In [16]: dview.targets = [0,2]
371 In [16]: dview.targets = [0,2]
372
372
373 In [17]: dview.block = False
373 In [17]: dview.block = False
374
374
375 In [18]: ar = dview.apply(lambda : 10)
375 In [18]: ar = dview.apply(lambda : 10)
376
376
377 In [19]: ar.get()
377 In [19]: ar.get()
378 Out[19]: [10, 10]
378 Out[19]: [10, 10]
379
379
380 In [20]: dview.targets = v.client.ids # all engines (4)
380 In [20]: dview.targets = v.client.ids # all engines (4)
381
381
382 In [21]: dview.block = True
382 In [21]: dview.block = True
383
383
384 In [22]: dview.apply(lambda : 42)
384 In [22]: dview.apply(lambda : 42)
385 Out[22]: [42, 42, 42, 42]
385 Out[22]: [42, 42, 42, 42]
386
386
387 The :attr:`block` and :attr:`targets` instance attributes of the
387 The :attr:`block` and :attr:`targets` instance attributes of the
388 :class:`.DirectView` also determine the behavior of the parallel magic commands.
388 :class:`.DirectView` also determine the behavior of the parallel magic commands.
389
389
390 .. seealso::
390 .. seealso::
391
391
392 See the documentation of the :ref:`Parallel Magics <parallel_magics>`.
392 See the documentation of the :ref:`Parallel Magics <parallel_magics>`.
393
393
394
394
395 Moving Python objects around
395 Moving Python objects around
396 ============================
396 ============================
397
397
398 In addition to calling functions and executing code on engines, you can
398 In addition to calling functions and executing code on engines, you can
399 transfer Python objects to and from your IPython session and the engines. In
399 transfer Python objects to and from your IPython session and the engines. In
400 IPython, these operations are called :meth:`push` (sending an object to the
400 IPython, these operations are called :meth:`push` (sending an object to the
401 engines) and :meth:`pull` (getting an object from the engines).
401 engines) and :meth:`pull` (getting an object from the engines).
402
402
403 Basic push and pull
403 Basic push and pull
404 -------------------
404 -------------------
405
405
406 Here are some examples of how you use :meth:`push` and :meth:`pull`:
406 Here are some examples of how you use :meth:`push` and :meth:`pull`:
407
407
408 .. sourcecode:: ipython
408 .. sourcecode:: ipython
409
409
410 In [38]: dview.push(dict(a=1.03234,b=3453))
410 In [38]: dview.push(dict(a=1.03234,b=3453))
411 Out[38]: [None,None,None,None]
411 Out[38]: [None,None,None,None]
412
412
413 In [39]: dview.pull('a')
413 In [39]: dview.pull('a')
414 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
414 Out[39]: [ 1.03234, 1.03234, 1.03234, 1.03234]
415
415
416 In [40]: dview.pull('b', targets=0)
416 In [40]: dview.pull('b', targets=0)
417 Out[40]: 3453
417 Out[40]: 3453
418
418
419 In [41]: dview.pull(('a','b'))
419 In [41]: dview.pull(('a','b'))
420 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
420 Out[41]: [ [1.03234, 3453], [1.03234, 3453], [1.03234, 3453], [1.03234, 3453] ]
421
421
422 In [42]: dview.push(dict(c='speed'))
422 In [42]: dview.push(dict(c='speed'))
423 Out[42]: [None,None,None,None]
423 Out[42]: [None,None,None,None]
424
424
425 In non-blocking mode :meth:`push` and :meth:`pull` also return
425 In non-blocking mode :meth:`push` and :meth:`pull` also return
426 :class:`AsyncResult` objects:
426 :class:`AsyncResult` objects:
427
427
428 .. sourcecode:: ipython
428 .. sourcecode:: ipython
429
429
430 In [48]: ar = dview.pull('a', block=False)
430 In [48]: ar = dview.pull('a', block=False)
431
431
432 In [49]: ar.get()
432 In [49]: ar.get()
433 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
433 Out[49]: [1.03234, 1.03234, 1.03234, 1.03234]
434
434
435
435
436 Dictionary interface
436 Dictionary interface
437 --------------------
437 --------------------
438
438
439 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
439 Since a Python namespace is just a :class:`dict`, :class:`DirectView` objects provide
440 dictionary-style access by key and methods such as :meth:`get` and
440 dictionary-style access by key and methods such as :meth:`get` and
441 :meth:`update` for convenience. This make the remote namespaces of the engines
441 :meth:`update` for convenience. This make the remote namespaces of the engines
442 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
442 appear as a local dictionary. Underneath, these methods call :meth:`apply`:
443
443
444 .. sourcecode:: ipython
444 .. sourcecode:: ipython
445
445
446 In [51]: dview['a']=['foo','bar']
446 In [51]: dview['a']=['foo','bar']
447
447
448 In [52]: dview['a']
448 In [52]: dview['a']
449 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
449 Out[52]: [ ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'], ['foo', 'bar'] ]
450
450
451 Scatter and gather
451 Scatter and gather
452 ------------------
452 ------------------
453
453
454 Sometimes it is useful to partition a sequence and push the partitions to
454 Sometimes it is useful to partition a sequence and push the partitions to
455 different engines. In MPI language, this is know as scatter/gather and we
455 different engines. In MPI language, this is know as scatter/gather and we
456 follow that terminology. However, it is important to remember that in
456 follow that terminology. However, it is important to remember that in
457 IPython's :class:`Client` class, :meth:`scatter` is from the
457 IPython's :class:`Client` class, :meth:`scatter` is from the
458 interactive IPython session to the engines and :meth:`gather` is from the
458 interactive IPython session to the engines and :meth:`gather` is from the
459 engines back to the interactive IPython session. For scatter/gather operations
459 engines back to the interactive IPython session. For scatter/gather operations
460 between engines, MPI, pyzmq, or some other direct interconnect should be used.
460 between engines, MPI, pyzmq, or some other direct interconnect should be used.
461
461
462 .. sourcecode:: ipython
462 .. sourcecode:: ipython
463
463
464 In [58]: dview.scatter('a',range(16))
464 In [58]: dview.scatter('a',range(16))
465 Out[58]: [None,None,None,None]
465 Out[58]: [None,None,None,None]
466
466
467 In [59]: dview['a']
467 In [59]: dview['a']
468 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
468 Out[59]: [ [0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15] ]
469
469
470 In [60]: dview.gather('a')
470 In [60]: dview.gather('a')
471 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
471 Out[60]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
472
472
473 Other things to look at
473 Other things to look at
474 =======================
474 =======================
475
475
476 How to do parallel list comprehensions
476 How to do parallel list comprehensions
477 --------------------------------------
477 --------------------------------------
478
478
479 In many cases list comprehensions are nicer than using the map function. While
479 In many cases list comprehensions are nicer than using the map function. While
480 we don't have fully parallel list comprehensions, it is simple to get the
480 we don't have fully parallel list comprehensions, it is simple to get the
481 basic effect using :meth:`scatter` and :meth:`gather`:
481 basic effect using :meth:`scatter` and :meth:`gather`:
482
482
483 .. sourcecode:: ipython
483 .. sourcecode:: ipython
484
484
485 In [66]: dview.scatter('x',range(64))
485 In [66]: dview.scatter('x',range(64))
486
486
487 In [67]: %px y = [i**10 for i in x]
487 In [67]: %px y = [i**10 for i in x]
488 Parallel execution on engines: [0, 1, 2, 3]
488 Parallel execution on engines: [0, 1, 2, 3]
489
489
490 In [68]: y = dview.gather('y')
490 In [68]: y = dview.gather('y')
491
491
492 In [69]: print y
492 In [69]: print y
493 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
493 [0, 1, 1024, 59049, 1048576, 9765625, 60466176, 282475249, 1073741824,...]
494
494
495 Remote imports
495 Remote imports
496 --------------
496 --------------
497
497
498 Sometimes you will want to import packages both in your interactive session
498 Sometimes you will want to import packages both in your interactive session
499 and on your remote engines. This can be done with the :class:`ContextManager`
499 and on your remote engines. This can be done with the :class:`ContextManager`
500 created by a DirectView's :meth:`sync_imports` method:
500 created by a DirectView's :meth:`sync_imports` method:
501
501
502 .. sourcecode:: ipython
502 .. sourcecode:: ipython
503
503
504 In [69]: with dview.sync_imports():
504 In [69]: with dview.sync_imports():
505 ....: import numpy
505 ....: import numpy
506 importing numpy on engine(s)
506 importing numpy on engine(s)
507
507
508 Any imports made inside the block will also be performed on the view's engines.
508 Any imports made inside the block will also be performed on the view's engines.
509 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
509 sync_imports also takes a `local` boolean flag that defaults to True, which specifies
510 whether the local imports should also be performed. However, support for `local=False`
510 whether the local imports should also be performed. However, support for `local=False`
511 has not been implemented, so only packages that can be imported locally will work
511 has not been implemented, so only packages that can be imported locally will work
512 this way.
512 this way.
513
513
514 You can also specify imports via the ``@require`` decorator. This is a decorator
514 You can also specify imports via the ``@require`` decorator. This is a decorator
515 designed for use in Dependencies, but can be used to handle remote imports as well.
515 designed for use in Dependencies, but can be used to handle remote imports as well.
516 Modules or module names passed to ``@require`` will be imported before the decorated
516 Modules or module names passed to ``@require`` will be imported before the decorated
517 function is called. If they cannot be imported, the decorated function will never
517 function is called. If they cannot be imported, the decorated function will never
518 execute and will fail with an UnmetDependencyError. Failures of single Engines will
518 execute and will fail with an UnmetDependencyError. Failures of single Engines will
519 be collected and raise a CompositeError, as demonstrated in the next section.
519 be collected and raise a CompositeError, as demonstrated in the next section.
520
520
521 .. sourcecode:: ipython
521 .. sourcecode:: ipython
522
522
523 In [69]: from IPython.parallel import require
523 In [69]: from IPython.parallel import require
524
524
525 In [70]: @require('re'):
525 In [70]: @require('re'):
526 ....: def findall(pat, x):
526 ....: def findall(pat, x):
527 ....: # re is guaranteed to be available
527 ....: # re is guaranteed to be available
528 ....: return re.findall(pat, x)
528 ....: return re.findall(pat, x)
529
529
530 # you can also pass modules themselves, that you already have locally:
530 # you can also pass modules themselves, that you already have locally:
531 In [71]: @require(time):
531 In [71]: @require(time):
532 ....: def wait(t):
532 ....: def wait(t):
533 ....: time.sleep(t)
533 ....: time.sleep(t)
534 ....: return t
534 ....: return t
535
535
536 .. note::
536 .. note::
537
537
538 :func:`sync_imports` does not allow ``import foo as bar`` syntax,
538 :func:`sync_imports` does not allow ``import foo as bar`` syntax,
539 because the assignment represented by the ``as bar`` part is not
539 because the assignment represented by the ``as bar`` part is not
540 available to the import hook.
540 available to the import hook.
541
541
542
542
543 .. _parallel_exceptions:
543 .. _parallel_exceptions:
544
544
545 Parallel exceptions
545 Parallel exceptions
546 -------------------
546 -------------------
547
547
548 In the multiengine interface, parallel commands can raise Python exceptions,
548 In the multiengine interface, parallel commands can raise Python exceptions,
549 just like serial commands. But it is a little subtle, because a single
549 just like serial commands. But it is a little subtle, because a single
550 parallel command can actually raise multiple exceptions (one for each engine
550 parallel command can actually raise multiple exceptions (one for each engine
551 the command was run on). To express this idea, we have a
551 the command was run on). To express this idea, we have a
552 :exc:`CompositeError` exception class that will be raised in most cases. The
552 :exc:`CompositeError` exception class that will be raised in most cases. The
553 :exc:`CompositeError` class is a special type of exception that wraps one or
553 :exc:`CompositeError` class is a special type of exception that wraps one or
554 more other types of exceptions. Here is how it works:
554 more other types of exceptions. Here is how it works:
555
555
556 .. sourcecode:: ipython
556 .. sourcecode:: ipython
557
557
558 In [78]: dview.block = True
558 In [78]: dview.block = True
559
559
560 In [79]: dview.execute("1/0")
560 In [79]: dview.execute("1/0")
561 [0:execute]:
561 [0:execute]:
562 ---------------------------------------------------------------------------
562 ---------------------------------------------------------------------------
563 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
563 ZeroDivisionError Traceback (most recent call last)
564 ----> 1 1/0
564 ----> 1 1/0
565 ZeroDivisionError: integer division or modulo by zero
565 ZeroDivisionError: integer division or modulo by zero
566
566
567 [1:execute]:
567 [1:execute]:
568 ---------------------------------------------------------------------------
568 ---------------------------------------------------------------------------
569 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
569 ZeroDivisionError Traceback (most recent call last)
570 ----> 1 1/0
570 ----> 1 1/0
571 ZeroDivisionError: integer division or modulo by zero
571 ZeroDivisionError: integer division or modulo by zero
572
572
573 [2:execute]:
573 [2:execute]:
574 ---------------------------------------------------------------------------
574 ---------------------------------------------------------------------------
575 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
575 ZeroDivisionError Traceback (most recent call last)
576 ----> 1 1/0
576 ----> 1 1/0
577 ZeroDivisionError: integer division or modulo by zero
577 ZeroDivisionError: integer division or modulo by zero
578
578
579 [3:execute]:
579 [3:execute]:
580 ---------------------------------------------------------------------------
580 ---------------------------------------------------------------------------
581 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
581 ZeroDivisionError Traceback (most recent call last)
582 ----> 1 1/0
582 ----> 1 1/0
583 ZeroDivisionError: integer division or modulo by zero
583 ZeroDivisionError: integer division or modulo by zero
584
584
585 Notice how the error message printed when :exc:`CompositeError` is raised has
585 Notice how the error message printed when :exc:`CompositeError` is raised has
586 information about the individual exceptions that were raised on each engine.
586 information about the individual exceptions that were raised on each engine.
587 If you want, you can even raise one of these original exceptions:
587 If you want, you can even raise one of these original exceptions:
588
588
589 .. sourcecode:: ipython
589 .. sourcecode:: ipython
590
590
591 In [80]: try:
591 In [80]: try:
592 ....: dview.execute('1/0', block=True)
592 ....: dview.execute('1/0', block=True)
593 ....: except parallel.error.CompositeError, e:
593 ....: except parallel.error.CompositeError, e:
594 ....: e.raise_exception()
594 ....: e.raise_exception()
595 ....:
595 ....:
596 ....:
596 ....:
597 ---------------------------------------------------------------------------
597 ---------------------------------------------------------------------------
598 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
598 ZeroDivisionError Traceback (most recent call last)
599 ----> 1 1/0
599 ----> 1 1/0
600 ZeroDivisionError: integer division or modulo by zero
600 ZeroDivisionError: integer division or modulo by zero
601
601
602 If you are working in IPython, you can simple type ``%debug`` after one of
602 If you are working in IPython, you can simple type ``%debug`` after one of
603 these :exc:`CompositeError` exceptions is raised, and inspect the exception
603 these :exc:`CompositeError` exceptions is raised, and inspect the exception
604 instance:
604 instance:
605
605
606 .. sourcecode:: ipython
606 .. sourcecode:: ipython
607
607
608 In [81]: dview.execute('1/0')
608 In [81]: dview.execute('1/0')
609 [0:execute]:
609 [0:execute]:
610 ---------------------------------------------------------------------------
610 ---------------------------------------------------------------------------
611 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
611 ZeroDivisionError Traceback (most recent call last)
612 ----> 1 1/0
612 ----> 1 1/0
613 ZeroDivisionError: integer division or modulo by zero
613 ZeroDivisionError: integer division or modulo by zero
614
614
615 [1:execute]:
615 [1:execute]:
616 ---------------------------------------------------------------------------
616 ---------------------------------------------------------------------------
617 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
617 ZeroDivisionError Traceback (most recent call last)
618 ----> 1 1/0
618 ----> 1 1/0
619 ZeroDivisionError: integer division or modulo by zero
619 ZeroDivisionError: integer division or modulo by zero
620
620
621 [2:execute]:
621 [2:execute]:
622 ---------------------------------------------------------------------------
622 ---------------------------------------------------------------------------
623 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
623 ZeroDivisionError Traceback (most recent call last)
624 ----> 1 1/0
624 ----> 1 1/0
625 ZeroDivisionError: integer division or modulo by zero
625 ZeroDivisionError: integer division or modulo by zero
626
626
627 [3:execute]:
627 [3:execute]:
628 ---------------------------------------------------------------------------
628 ---------------------------------------------------------------------------
629 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
629 ZeroDivisionError Traceback (most recent call last)
630 ----> 1 1/0
630 ----> 1 1/0
631 ZeroDivisionError: integer division or modulo by zero
631 ZeroDivisionError: integer division or modulo by zero
632
632
633 In [82]: %debug
633 In [82]: %debug
634 > /.../site-packages/IPython/parallel/client/asyncresult.py(125)get()
634 > /.../site-packages/IPython/parallel/client/asyncresult.py(125)get()
635 124 else:
635 124 else:
636 --> 125 raise self._exception
636 --> 125 raise self._exception
637 126 else:
637 126 else:
638
638
639 # Here, self._exception is the CompositeError instance:
639 # Here, self._exception is the CompositeError instance:
640
640
641 ipdb> e = self._exception
641 ipdb> e = self._exception
642 ipdb> e
642 ipdb> e
643 CompositeError(4)
643 CompositeError(4)
644
644
645 # we can tab-complete on e to see available methods:
645 # we can tab-complete on e to see available methods:
646 ipdb> e.<TAB>
646 ipdb> e.<TAB>
647 e.args e.message e.traceback
647 e.args e.message e.traceback
648 e.elist e.msg
648 e.elist e.msg
649 e.ename e.print_traceback
649 e.ename e.print_traceback
650 e.engine_info e.raise_exception
650 e.engine_info e.raise_exception
651 e.evalue e.render_traceback
651 e.evalue e.render_traceback
652
652
653 # We can then display the individual tracebacks, if we want:
653 # We can then display the individual tracebacks, if we want:
654 ipdb> e.print_traceback(1)
654 ipdb> e.print_traceback(1)
655 [1:execute]:
655 [1:execute]:
656 ---------------------------------------------------------------------------
656 ---------------------------------------------------------------------------
657 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
657 ZeroDivisionError Traceback (most recent call last)
658 ----> 1 1/0
658 ----> 1 1/0
659 ZeroDivisionError: integer division or modulo by zero
659 ZeroDivisionError: integer division or modulo by zero
660
660
661
661
662 Since you might have 100 engines, you probably don't want to see 100 tracebacks
663 for a simple NameError because of a typo.
664 For this reason, CompositeError truncates the list of exceptions it will print
665 to :attr:`CompositeError.tb_limit` (default is five).
666 You can change this limit to suit your needs with:
667
668 .. sourcecode:: ipython
669
670 In [20]: from IPython.parallel import CompositeError
671 In [21]: CompositeError.tb_limit = 1
672 In [22]: %px a=b
673 [0:execute]:
674 ---------------------------------------------------------------------------
675 NameError Traceback (most recent call last)
676 ----> 1 a=b
677 NameError: name 'b' is not defined
678
679 ... 3 more exceptions ...
680
681
662 All of this same error handling magic even works in non-blocking mode:
682 All of this same error handling magic even works in non-blocking mode:
663
683
664 .. sourcecode:: ipython
684 .. sourcecode:: ipython
665
685
666 In [83]: dview.block=False
686 In [83]: dview.block=False
667
687
668 In [84]: ar = dview.execute('1/0')
688 In [84]: ar = dview.execute('1/0')
669
689
670 In [85]: ar.get()
690 In [85]: ar.get()
671 [0:execute]:
691 [0:execute]:
672 ---------------------------------------------------------------------------
692 ---------------------------------------------------------------------------
673 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
693 ZeroDivisionError Traceback (most recent call last)
674 ----> 1 1/0
675 ZeroDivisionError: integer division or modulo by zero
676
677 [1:execute]:
678 ---------------------------------------------------------------------------
679 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
680 ----> 1 1/0
694 ----> 1 1/0
681 ZeroDivisionError: integer division or modulo by zero
695 ZeroDivisionError: integer division or modulo by zero
682
696
683 [2:execute]:
697 ... 3 more exceptions ...
684 ---------------------------------------------------------------------------
685 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
686 ----> 1 1/0
687 ZeroDivisionError: integer division or modulo by zero
688
689 [3:execute]:
690 ---------------------------------------------------------------------------
691 ZeroDivisionError Traceback (most recent call last)<ipython-input-1-05c9758a9c21> in <module>()
692 ----> 1 1/0
693 ZeroDivisionError: integer division or modulo by zero
694
General Comments 0
You need to be logged in to leave comments. Login now