##// END OF EJS Templates
truncate potentially long CompositeErrors...
MinRK -
Show More
@@ -1,346 +1,354 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """Classes and functions for kernel related errors and exceptions.
3 """Classes and functions for kernel related errors and exceptions.
4
4
5 Inheritance diagram:
5 Inheritance diagram:
6
6
7 .. inheritance-diagram:: IPython.parallel.error
7 .. inheritance-diagram:: IPython.parallel.error
8 :parts: 3
8 :parts: 3
9
9
10 Authors:
10 Authors:
11
11
12 * Brian Granger
12 * Brian Granger
13 * Min RK
13 * Min RK
14 """
14 """
15 from __future__ import print_function
15 from __future__ import print_function
16
16
17 import sys
17 import sys
18 import traceback
18 import traceback
19
19
20 __docformat__ = "restructuredtext en"
20 __docformat__ = "restructuredtext en"
21
21
22 # Tell nose to skip this module
22 # Tell nose to skip this module
23 __test__ = {}
23 __test__ = {}
24
24
25 #-------------------------------------------------------------------------------
25 #-------------------------------------------------------------------------------
26 # Copyright (C) 2008-2011 The IPython Development Team
26 # Copyright (C) 2008-2011 The IPython Development Team
27 #
27 #
28 # Distributed under the terms of the BSD License. The full license is in
28 # Distributed under the terms of the BSD License. The full license is in
29 # the file COPYING, distributed as part of this software.
29 # the file COPYING, distributed as part of this software.
30 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
31
31
32 #-------------------------------------------------------------------------------
32 #-------------------------------------------------------------------------------
33 # Error classes
33 # Error classes
34 #-------------------------------------------------------------------------------
34 #-------------------------------------------------------------------------------
35 class IPythonError(Exception):
35 class IPythonError(Exception):
36 """Base exception that all of our exceptions inherit from.
36 """Base exception that all of our exceptions inherit from.
37
37
38 This can be raised by code that doesn't have any more specific
38 This can be raised by code that doesn't have any more specific
39 information."""
39 information."""
40
40
41 pass
41 pass
42
42
43 # Exceptions associated with the controller objects
43 # Exceptions associated with the controller objects
44 class ControllerError(IPythonError): pass
44 class ControllerError(IPythonError): pass
45
45
46 class ControllerCreationError(ControllerError): pass
46 class ControllerCreationError(ControllerError): pass
47
47
48
48
49 # Exceptions associated with the Engines
49 # Exceptions associated with the Engines
50 class EngineError(IPythonError): pass
50 class EngineError(IPythonError): pass
51
51
52 class EngineCreationError(EngineError): pass
52 class EngineCreationError(EngineError): pass
53
53
54 class KernelError(IPythonError):
54 class KernelError(IPythonError):
55 pass
55 pass
56
56
57 class NotDefined(KernelError):
57 class NotDefined(KernelError):
58 def __init__(self, name):
58 def __init__(self, name):
59 self.name = name
59 self.name = name
60 self.args = (name,)
60 self.args = (name,)
61
61
62 def __repr__(self):
62 def __repr__(self):
63 return '<NotDefined: %s>' % self.name
63 return '<NotDefined: %s>' % self.name
64
64
65 __str__ = __repr__
65 __str__ = __repr__
66
66
67
67
68 class QueueCleared(KernelError):
68 class QueueCleared(KernelError):
69 pass
69 pass
70
70
71
71
72 class IdInUse(KernelError):
72 class IdInUse(KernelError):
73 pass
73 pass
74
74
75
75
76 class ProtocolError(KernelError):
76 class ProtocolError(KernelError):
77 pass
77 pass
78
78
79
79
80 class ConnectionError(KernelError):
80 class ConnectionError(KernelError):
81 pass
81 pass
82
82
83
83
84 class InvalidEngineID(KernelError):
84 class InvalidEngineID(KernelError):
85 pass
85 pass
86
86
87
87
88 class NoEnginesRegistered(KernelError):
88 class NoEnginesRegistered(KernelError):
89 pass
89 pass
90
90
91
91
92 class InvalidClientID(KernelError):
92 class InvalidClientID(KernelError):
93 pass
93 pass
94
94
95
95
96 class InvalidDeferredID(KernelError):
96 class InvalidDeferredID(KernelError):
97 pass
97 pass
98
98
99
99
100 class SerializationError(KernelError):
100 class SerializationError(KernelError):
101 pass
101 pass
102
102
103
103
104 class MessageSizeError(KernelError):
104 class MessageSizeError(KernelError):
105 pass
105 pass
106
106
107
107
108 class PBMessageSizeError(MessageSizeError):
108 class PBMessageSizeError(MessageSizeError):
109 pass
109 pass
110
110
111
111
112 class ResultNotCompleted(KernelError):
112 class ResultNotCompleted(KernelError):
113 pass
113 pass
114
114
115
115
116 class ResultAlreadyRetrieved(KernelError):
116 class ResultAlreadyRetrieved(KernelError):
117 pass
117 pass
118
118
119 class ClientError(KernelError):
119 class ClientError(KernelError):
120 pass
120 pass
121
121
122
122
123 class TaskAborted(KernelError):
123 class TaskAborted(KernelError):
124 pass
124 pass
125
125
126
126
127 class TaskTimeout(KernelError):
127 class TaskTimeout(KernelError):
128 pass
128 pass
129
129
130
130
131 class NotAPendingResult(KernelError):
131 class NotAPendingResult(KernelError):
132 pass
132 pass
133
133
134
134
135 class UnpickleableException(KernelError):
135 class UnpickleableException(KernelError):
136 pass
136 pass
137
137
138
138
139 class AbortedPendingDeferredError(KernelError):
139 class AbortedPendingDeferredError(KernelError):
140 pass
140 pass
141
141
142
142
143 class InvalidProperty(KernelError):
143 class InvalidProperty(KernelError):
144 pass
144 pass
145
145
146
146
147 class MissingBlockArgument(KernelError):
147 class MissingBlockArgument(KernelError):
148 pass
148 pass
149
149
150
150
151 class StopLocalExecution(KernelError):
151 class StopLocalExecution(KernelError):
152 pass
152 pass
153
153
154
154
155 class SecurityError(KernelError):
155 class SecurityError(KernelError):
156 pass
156 pass
157
157
158
158
159 class FileTimeoutError(KernelError):
159 class FileTimeoutError(KernelError):
160 pass
160 pass
161
161
162 class TimeoutError(KernelError):
162 class TimeoutError(KernelError):
163 pass
163 pass
164
164
165 class UnmetDependency(KernelError):
165 class UnmetDependency(KernelError):
166 pass
166 pass
167
167
168 class ImpossibleDependency(UnmetDependency):
168 class ImpossibleDependency(UnmetDependency):
169 pass
169 pass
170
170
171 class DependencyTimeout(ImpossibleDependency):
171 class DependencyTimeout(ImpossibleDependency):
172 pass
172 pass
173
173
174 class InvalidDependency(ImpossibleDependency):
174 class InvalidDependency(ImpossibleDependency):
175 pass
175 pass
176
176
177 class RemoteError(KernelError):
177 class RemoteError(KernelError):
178 """Error raised elsewhere"""
178 """Error raised elsewhere"""
179 ename=None
179 ename=None
180 evalue=None
180 evalue=None
181 traceback=None
181 traceback=None
182 engine_info=None
182 engine_info=None
183
183
184 def __init__(self, ename, evalue, traceback, engine_info=None):
184 def __init__(self, ename, evalue, traceback, engine_info=None):
185 self.ename=ename
185 self.ename=ename
186 self.evalue=evalue
186 self.evalue=evalue
187 self.traceback=traceback
187 self.traceback=traceback
188 self.engine_info=engine_info or {}
188 self.engine_info=engine_info or {}
189 self.args=(ename, evalue)
189 self.args=(ename, evalue)
190
190
191 def __repr__(self):
191 def __repr__(self):
192 engineid = self.engine_info.get('engine_id', ' ')
192 engineid = self.engine_info.get('engine_id', ' ')
193 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
193 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
194
194
195 def __str__(self):
195 def __str__(self):
196 return "%s(%s)" % (self.ename, self.evalue)
196 return "%s(%s)" % (self.ename, self.evalue)
197
197
198 def render_traceback(self):
198 def render_traceback(self):
199 """render traceback to a list of lines"""
199 """render traceback to a list of lines"""
200 return (self.traceback or "No traceback available").splitlines()
200 return (self.traceback or "No traceback available").splitlines()
201
201
202 def _render_traceback_(self):
202 def _render_traceback_(self):
203 """Special method for custom tracebacks within IPython.
203 """Special method for custom tracebacks within IPython.
204
204
205 This will be called by IPython instead of displaying the local traceback.
205 This will be called by IPython instead of displaying the local traceback.
206
206
207 It should return a traceback rendered as a list of lines.
207 It should return a traceback rendered as a list of lines.
208 """
208 """
209 return self.render_traceback()
209 return self.render_traceback()
210
210
211 def print_traceback(self, excid=None):
211 def print_traceback(self, excid=None):
212 """print my traceback"""
212 """print my traceback"""
213 print('\n'.join(self.render_traceback()))
213 print('\n'.join(self.render_traceback()))
214
214
215
215
216
216
217
217
218 class TaskRejectError(KernelError):
218 class TaskRejectError(KernelError):
219 """Exception to raise when a task should be rejected by an engine.
219 """Exception to raise when a task should be rejected by an engine.
220
220
221 This exception can be used to allow a task running on an engine to test
221 This exception can be used to allow a task running on an engine to test
222 if the engine (or the user's namespace on the engine) has the needed
222 if the engine (or the user's namespace on the engine) has the needed
223 task dependencies. If not, the task should raise this exception. For
223 task dependencies. If not, the task should raise this exception. For
224 the task to be retried on another engine, the task should be created
224 the task to be retried on another engine, the task should be created
225 with the `retries` argument > 1.
225 with the `retries` argument > 1.
226
226
227 The advantage of this approach over our older properties system is that
227 The advantage of this approach over our older properties system is that
228 tasks have full access to the user's namespace on the engines and the
228 tasks have full access to the user's namespace on the engines and the
229 properties don't have to be managed or tested by the controller.
229 properties don't have to be managed or tested by the controller.
230 """
230 """
231
231
232
232
233 class CompositeError(RemoteError):
233 class CompositeError(RemoteError):
234 """Error for representing possibly multiple errors on engines"""
234 """Error for representing possibly multiple errors on engines"""
235 tb_limit = 4 # limit on how many tracebacks to draw
236
235 def __init__(self, message, elist):
237 def __init__(self, message, elist):
236 Exception.__init__(self, *(message, elist))
238 Exception.__init__(self, *(message, elist))
237 # Don't use pack_exception because it will conflict with the .message
239 # Don't use pack_exception because it will conflict with the .message
238 # attribute that is being deprecated in 2.6 and beyond.
240 # attribute that is being deprecated in 2.6 and beyond.
239 self.msg = message
241 self.msg = message
240 self.elist = elist
242 self.elist = elist
241 self.args = [ e[0] for e in elist ]
243 self.args = [ e[0] for e in elist ]
242
244
243 def _get_engine_str(self, ei):
245 def _get_engine_str(self, ei):
244 if not ei:
246 if not ei:
245 return '[Engine Exception]'
247 return '[Engine Exception]'
246 else:
248 else:
247 return '[%s:%s]: ' % (ei['engine_id'], ei['method'])
249 return '[%s:%s]: ' % (ei['engine_id'], ei['method'])
248
250
249 def _get_traceback(self, ev):
251 def _get_traceback(self, ev):
250 try:
252 try:
251 tb = ev._ipython_traceback_text
253 tb = ev._ipython_traceback_text
252 except AttributeError:
254 except AttributeError:
253 return 'No traceback available'
255 return 'No traceback available'
254 else:
256 else:
255 return tb
257 return tb
256
258
257 def __str__(self):
259 def __str__(self):
258 s = str(self.msg)
260 s = str(self.msg)
259 for en, ev, etb, ei in self.elist:
261 for en, ev, etb, ei in self.elist[:self.tb_limit]:
260 engine_str = self._get_engine_str(ei)
262 engine_str = self._get_engine_str(ei)
261 s = s + '\n' + engine_str + en + ': ' + str(ev)
263 s = s + '\n' + engine_str + en + ': ' + str(ev)
264 if len(self.elist) > self.tb_limit:
265 s = s + '\n.... %i more exceptions ...' % (len(self.elist) - self.tb_limit)
262 return s
266 return s
263
267
264 def __repr__(self):
268 def __repr__(self):
265 return "CompositeError(%i)"%len(self.elist)
269 return "CompositeError(%i)" % len(self.elist)
266
270
267 def render_traceback(self, excid=None):
271 def render_traceback(self, excid=None):
268 """render one or all of my tracebacks to a list of lines"""
272 """render one or all of my tracebacks to a list of lines"""
269 lines = []
273 lines = []
270 if excid is None:
274 if excid is None:
271 for (en,ev,etb,ei) in self.elist:
275 for (en,ev,etb,ei) in self.elist[:self.tb_limit]:
272 lines.append(self._get_engine_str(ei))
276 lines.append(self._get_engine_str(ei))
273 lines.extend((etb or 'No traceback available').splitlines())
277 lines.extend((etb or 'No traceback available').splitlines())
274 lines.append('')
278 lines.append('')
279 if len(self.elist) > self.tb_limit:
280 lines.append(
281 '... %i more exceptions ...' % (len(self.elist) - self.tb_limit)
282 )
275 else:
283 else:
276 try:
284 try:
277 en,ev,etb,ei = self.elist[excid]
285 en,ev,etb,ei = self.elist[excid]
278 except:
286 except:
279 raise IndexError("an exception with index %i does not exist"%excid)
287 raise IndexError("an exception with index %i does not exist"%excid)
280 else:
288 else:
281 lines.append(self._get_engine_str(ei))
289 lines.append(self._get_engine_str(ei))
282 lines.extend((etb or 'No traceback available').splitlines())
290 lines.extend((etb or 'No traceback available').splitlines())
283
291
284 return lines
292 return lines
285
293
286 def print_traceback(self, excid=None):
294 def print_traceback(self, excid=None):
287 print('\n'.join(self.render_traceback(excid)))
295 print('\n'.join(self.render_traceback(excid)))
288
296
289 def raise_exception(self, excid=0):
297 def raise_exception(self, excid=0):
290 try:
298 try:
291 en,ev,etb,ei = self.elist[excid]
299 en,ev,etb,ei = self.elist[excid]
292 except:
300 except:
293 raise IndexError("an exception with index %i does not exist"%excid)
301 raise IndexError("an exception with index %i does not exist"%excid)
294 else:
302 else:
295 raise RemoteError(en, ev, etb, ei)
303 raise RemoteError(en, ev, etb, ei)
296
304
297
305
298 def collect_exceptions(rdict_or_list, method='unspecified'):
306 def collect_exceptions(rdict_or_list, method='unspecified'):
299 """check a result dict for errors, and raise CompositeError if any exist.
307 """check a result dict for errors, and raise CompositeError if any exist.
300 Passthrough otherwise."""
308 Passthrough otherwise."""
301 elist = []
309 elist = []
302 if isinstance(rdict_or_list, dict):
310 if isinstance(rdict_or_list, dict):
303 rlist = rdict_or_list.values()
311 rlist = rdict_or_list.values()
304 else:
312 else:
305 rlist = rdict_or_list
313 rlist = rdict_or_list
306 for r in rlist:
314 for r in rlist:
307 if isinstance(r, RemoteError):
315 if isinstance(r, RemoteError):
308 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
316 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
309 # Sometimes we could have CompositeError in our list. Just take
317 # Sometimes we could have CompositeError in our list. Just take
310 # the errors out of them and put them in our new list. This
318 # the errors out of them and put them in our new list. This
311 # has the effect of flattening lists of CompositeErrors into one
319 # has the effect of flattening lists of CompositeErrors into one
312 # CompositeError
320 # CompositeError
313 if en=='CompositeError':
321 if en=='CompositeError':
314 for e in ev.elist:
322 for e in ev.elist:
315 elist.append(e)
323 elist.append(e)
316 else:
324 else:
317 elist.append((en, ev, etb, ei))
325 elist.append((en, ev, etb, ei))
318 if len(elist)==0:
326 if len(elist)==0:
319 return rdict_or_list
327 return rdict_or_list
320 else:
328 else:
321 msg = "one or more exceptions from call to method: %s" % (method)
329 msg = "one or more exceptions from call to method: %s" % (method)
322 # This silliness is needed so the debugger has access to the exception
330 # This silliness is needed so the debugger has access to the exception
323 # instance (e in this case)
331 # instance (e in this case)
324 try:
332 try:
325 raise CompositeError(msg, elist)
333 raise CompositeError(msg, elist)
326 except CompositeError as e:
334 except CompositeError as e:
327 raise e
335 raise e
328
336
329 def wrap_exception(engine_info={}):
337 def wrap_exception(engine_info={}):
330 etype, evalue, tb = sys.exc_info()
338 etype, evalue, tb = sys.exc_info()
331 stb = traceback.format_exception(etype, evalue, tb)
339 stb = traceback.format_exception(etype, evalue, tb)
332 exc_content = {
340 exc_content = {
333 'status' : 'error',
341 'status' : 'error',
334 'traceback' : stb,
342 'traceback' : stb,
335 'ename' : unicode(etype.__name__),
343 'ename' : unicode(etype.__name__),
336 'evalue' : unicode(evalue),
344 'evalue' : unicode(evalue),
337 'engine_info' : engine_info
345 'engine_info' : engine_info
338 }
346 }
339 return exc_content
347 return exc_content
340
348
341 def unwrap_exception(content):
349 def unwrap_exception(content):
342 err = RemoteError(content['ename'], content['evalue'],
350 err = RemoteError(content['ename'], content['evalue'],
343 ''.join(content['traceback']),
351 ''.join(content['traceback']),
344 content.get('engine_info', {}))
352 content.get('engine_info', {}))
345 return err
353 return err
346
354
@@ -1,737 +1,764 b''
1 # -*- coding: utf-8 -*-
1 # -*- coding: utf-8 -*-
2 """test View objects
2 """test View objects
3
3
4 Authors:
4 Authors:
5
5
6 * Min RK
6 * Min RK
7 """
7 """
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 import sys
19 import sys
20 import platform
20 import platform
21 import time
21 import time
22 from tempfile import mktemp
22 from tempfile import mktemp
23 from StringIO import StringIO
23 from StringIO import StringIO
24
24
25 import zmq
25 import zmq
26 from nose import SkipTest
26 from nose import SkipTest
27 from nose.plugins.attrib import attr
27 from nose.plugins.attrib import attr
28
28
29 from IPython.testing import decorators as dec
29 from IPython.testing import decorators as dec
30 from IPython.testing.ipunittest import ParametricTestCase
30 from IPython.testing.ipunittest import ParametricTestCase
31 from IPython.utils.io import capture_output
31 from IPython.utils.io import capture_output
32
32
33 from IPython import parallel as pmod
33 from IPython import parallel as pmod
34 from IPython.parallel import error
34 from IPython.parallel import error
35 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
35 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
36 from IPython.parallel import DirectView
36 from IPython.parallel import DirectView
37 from IPython.parallel.util import interactive
37 from IPython.parallel.util import interactive
38
38
39 from IPython.parallel.tests import add_engines
39 from IPython.parallel.tests import add_engines
40
40
41 from .clienttest import ClusterTestCase, crash, wait, skip_without
41 from .clienttest import ClusterTestCase, crash, wait, skip_without
42
42
43 def setup():
43 def setup():
44 add_engines(3, total=True)
44 add_engines(3, total=True)
45
45
46 class TestView(ClusterTestCase, ParametricTestCase):
46 class TestView(ClusterTestCase, ParametricTestCase):
47
47
48 def setUp(self):
48 def setUp(self):
49 # On Win XP, wait for resource cleanup, else parallel test group fails
49 # On Win XP, wait for resource cleanup, else parallel test group fails
50 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
50 if platform.system() == "Windows" and platform.win32_ver()[0] == "XP":
51 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
51 # 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety
52 time.sleep(2)
52 time.sleep(2)
53 super(TestView, self).setUp()
53 super(TestView, self).setUp()
54
54
55 @attr('crash')
55 @attr('crash')
56 def test_z_crash_mux(self):
56 def test_z_crash_mux(self):
57 """test graceful handling of engine death (direct)"""
57 """test graceful handling of engine death (direct)"""
58 # self.add_engines(1)
58 # self.add_engines(1)
59 eid = self.client.ids[-1]
59 eid = self.client.ids[-1]
60 ar = self.client[eid].apply_async(crash)
60 ar = self.client[eid].apply_async(crash)
61 self.assertRaisesRemote(error.EngineError, ar.get, 10)
61 self.assertRaisesRemote(error.EngineError, ar.get, 10)
62 eid = ar.engine_id
62 eid = ar.engine_id
63 tic = time.time()
63 tic = time.time()
64 while eid in self.client.ids and time.time()-tic < 5:
64 while eid in self.client.ids and time.time()-tic < 5:
65 time.sleep(.01)
65 time.sleep(.01)
66 self.client.spin()
66 self.client.spin()
67 self.assertFalse(eid in self.client.ids, "Engine should have died")
67 self.assertFalse(eid in self.client.ids, "Engine should have died")
68
68
69 def test_push_pull(self):
69 def test_push_pull(self):
70 """test pushing and pulling"""
70 """test pushing and pulling"""
71 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
71 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
72 t = self.client.ids[-1]
72 t = self.client.ids[-1]
73 v = self.client[t]
73 v = self.client[t]
74 push = v.push
74 push = v.push
75 pull = v.pull
75 pull = v.pull
76 v.block=True
76 v.block=True
77 nengines = len(self.client)
77 nengines = len(self.client)
78 push({'data':data})
78 push({'data':data})
79 d = pull('data')
79 d = pull('data')
80 self.assertEqual(d, data)
80 self.assertEqual(d, data)
81 self.client[:].push({'data':data})
81 self.client[:].push({'data':data})
82 d = self.client[:].pull('data', block=True)
82 d = self.client[:].pull('data', block=True)
83 self.assertEqual(d, nengines*[data])
83 self.assertEqual(d, nengines*[data])
84 ar = push({'data':data}, block=False)
84 ar = push({'data':data}, block=False)
85 self.assertTrue(isinstance(ar, AsyncResult))
85 self.assertTrue(isinstance(ar, AsyncResult))
86 r = ar.get()
86 r = ar.get()
87 ar = self.client[:].pull('data', block=False)
87 ar = self.client[:].pull('data', block=False)
88 self.assertTrue(isinstance(ar, AsyncResult))
88 self.assertTrue(isinstance(ar, AsyncResult))
89 r = ar.get()
89 r = ar.get()
90 self.assertEqual(r, nengines*[data])
90 self.assertEqual(r, nengines*[data])
91 self.client[:].push(dict(a=10,b=20))
91 self.client[:].push(dict(a=10,b=20))
92 r = self.client[:].pull(('a','b'), block=True)
92 r = self.client[:].pull(('a','b'), block=True)
93 self.assertEqual(r, nengines*[[10,20]])
93 self.assertEqual(r, nengines*[[10,20]])
94
94
95 def test_push_pull_function(self):
95 def test_push_pull_function(self):
96 "test pushing and pulling functions"
96 "test pushing and pulling functions"
97 def testf(x):
97 def testf(x):
98 return 2.0*x
98 return 2.0*x
99
99
100 t = self.client.ids[-1]
100 t = self.client.ids[-1]
101 v = self.client[t]
101 v = self.client[t]
102 v.block=True
102 v.block=True
103 push = v.push
103 push = v.push
104 pull = v.pull
104 pull = v.pull
105 execute = v.execute
105 execute = v.execute
106 push({'testf':testf})
106 push({'testf':testf})
107 r = pull('testf')
107 r = pull('testf')
108 self.assertEqual(r(1.0), testf(1.0))
108 self.assertEqual(r(1.0), testf(1.0))
109 execute('r = testf(10)')
109 execute('r = testf(10)')
110 r = pull('r')
110 r = pull('r')
111 self.assertEqual(r, testf(10))
111 self.assertEqual(r, testf(10))
112 ar = self.client[:].push({'testf':testf}, block=False)
112 ar = self.client[:].push({'testf':testf}, block=False)
113 ar.get()
113 ar.get()
114 ar = self.client[:].pull('testf', block=False)
114 ar = self.client[:].pull('testf', block=False)
115 rlist = ar.get()
115 rlist = ar.get()
116 for r in rlist:
116 for r in rlist:
117 self.assertEqual(r(1.0), testf(1.0))
117 self.assertEqual(r(1.0), testf(1.0))
118 execute("def g(x): return x*x")
118 execute("def g(x): return x*x")
119 r = pull(('testf','g'))
119 r = pull(('testf','g'))
120 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
120 self.assertEqual((r[0](10),r[1](10)), (testf(10), 100))
121
121
122 def test_push_function_globals(self):
122 def test_push_function_globals(self):
123 """test that pushed functions have access to globals"""
123 """test that pushed functions have access to globals"""
124 @interactive
124 @interactive
125 def geta():
125 def geta():
126 return a
126 return a
127 # self.add_engines(1)
127 # self.add_engines(1)
128 v = self.client[-1]
128 v = self.client[-1]
129 v.block=True
129 v.block=True
130 v['f'] = geta
130 v['f'] = geta
131 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
131 self.assertRaisesRemote(NameError, v.execute, 'b=f()')
132 v.execute('a=5')
132 v.execute('a=5')
133 v.execute('b=f()')
133 v.execute('b=f()')
134 self.assertEqual(v['b'], 5)
134 self.assertEqual(v['b'], 5)
135
135
136 def test_push_function_defaults(self):
136 def test_push_function_defaults(self):
137 """test that pushed functions preserve default args"""
137 """test that pushed functions preserve default args"""
138 def echo(a=10):
138 def echo(a=10):
139 return a
139 return a
140 v = self.client[-1]
140 v = self.client[-1]
141 v.block=True
141 v.block=True
142 v['f'] = echo
142 v['f'] = echo
143 v.execute('b=f()')
143 v.execute('b=f()')
144 self.assertEqual(v['b'], 10)
144 self.assertEqual(v['b'], 10)
145
145
146 def test_get_result(self):
146 def test_get_result(self):
147 """test getting results from the Hub."""
147 """test getting results from the Hub."""
148 c = pmod.Client(profile='iptest')
148 c = pmod.Client(profile='iptest')
149 # self.add_engines(1)
149 # self.add_engines(1)
150 t = c.ids[-1]
150 t = c.ids[-1]
151 v = c[t]
151 v = c[t]
152 v2 = self.client[t]
152 v2 = self.client[t]
153 ar = v.apply_async(wait, 1)
153 ar = v.apply_async(wait, 1)
154 # give the monitor time to notice the message
154 # give the monitor time to notice the message
155 time.sleep(.25)
155 time.sleep(.25)
156 ahr = v2.get_result(ar.msg_ids)
156 ahr = v2.get_result(ar.msg_ids)
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 self.assertEqual(ahr.get(), ar.get())
158 self.assertEqual(ahr.get(), ar.get())
159 ar2 = v2.get_result(ar.msg_ids)
159 ar2 = v2.get_result(ar.msg_ids)
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 c.spin()
161 c.spin()
162 c.close()
162 c.close()
163
163
164 def test_run_newline(self):
164 def test_run_newline(self):
165 """test that run appends newline to files"""
165 """test that run appends newline to files"""
166 tmpfile = mktemp()
166 tmpfile = mktemp()
167 with open(tmpfile, 'w') as f:
167 with open(tmpfile, 'w') as f:
168 f.write("""def g():
168 f.write("""def g():
169 return 5
169 return 5
170 """)
170 """)
171 v = self.client[-1]
171 v = self.client[-1]
172 v.run(tmpfile, block=True)
172 v.run(tmpfile, block=True)
173 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
173 self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5)
174
174
175 def test_apply_tracked(self):
175 def test_apply_tracked(self):
176 """test tracking for apply"""
176 """test tracking for apply"""
177 # self.add_engines(1)
177 # self.add_engines(1)
178 t = self.client.ids[-1]
178 t = self.client.ids[-1]
179 v = self.client[t]
179 v = self.client[t]
180 v.block=False
180 v.block=False
181 def echo(n=1024*1024, **kwargs):
181 def echo(n=1024*1024, **kwargs):
182 with v.temp_flags(**kwargs):
182 with v.temp_flags(**kwargs):
183 return v.apply(lambda x: x, 'x'*n)
183 return v.apply(lambda x: x, 'x'*n)
184 ar = echo(1, track=False)
184 ar = echo(1, track=False)
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
185 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
186 self.assertTrue(ar.sent)
186 self.assertTrue(ar.sent)
187 ar = echo(track=True)
187 ar = echo(track=True)
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
188 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
189 self.assertEqual(ar.sent, ar._tracker.done)
189 self.assertEqual(ar.sent, ar._tracker.done)
190 ar._tracker.wait()
190 ar._tracker.wait()
191 self.assertTrue(ar.sent)
191 self.assertTrue(ar.sent)
192
192
193 def test_push_tracked(self):
193 def test_push_tracked(self):
194 t = self.client.ids[-1]
194 t = self.client.ids[-1]
195 ns = dict(x='x'*1024*1024)
195 ns = dict(x='x'*1024*1024)
196 v = self.client[t]
196 v = self.client[t]
197 ar = v.push(ns, block=False, track=False)
197 ar = v.push(ns, block=False, track=False)
198 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
198 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
199 self.assertTrue(ar.sent)
199 self.assertTrue(ar.sent)
200
200
201 ar = v.push(ns, block=False, track=True)
201 ar = v.push(ns, block=False, track=True)
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
202 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
203 ar._tracker.wait()
203 ar._tracker.wait()
204 self.assertEqual(ar.sent, ar._tracker.done)
204 self.assertEqual(ar.sent, ar._tracker.done)
205 self.assertTrue(ar.sent)
205 self.assertTrue(ar.sent)
206 ar.get()
206 ar.get()
207
207
208 def test_scatter_tracked(self):
208 def test_scatter_tracked(self):
209 t = self.client.ids
209 t = self.client.ids
210 x='x'*1024*1024
210 x='x'*1024*1024
211 ar = self.client[t].scatter('x', x, block=False, track=False)
211 ar = self.client[t].scatter('x', x, block=False, track=False)
212 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
212 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
213 self.assertTrue(ar.sent)
213 self.assertTrue(ar.sent)
214
214
215 ar = self.client[t].scatter('x', x, block=False, track=True)
215 ar = self.client[t].scatter('x', x, block=False, track=True)
216 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
216 self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
217 self.assertEqual(ar.sent, ar._tracker.done)
217 self.assertEqual(ar.sent, ar._tracker.done)
218 ar._tracker.wait()
218 ar._tracker.wait()
219 self.assertTrue(ar.sent)
219 self.assertTrue(ar.sent)
220 ar.get()
220 ar.get()
221
221
222 def test_remote_reference(self):
222 def test_remote_reference(self):
223 v = self.client[-1]
223 v = self.client[-1]
224 v['a'] = 123
224 v['a'] = 123
225 ra = pmod.Reference('a')
225 ra = pmod.Reference('a')
226 b = v.apply_sync(lambda x: x, ra)
226 b = v.apply_sync(lambda x: x, ra)
227 self.assertEqual(b, 123)
227 self.assertEqual(b, 123)
228
228
229
229
230 def test_scatter_gather(self):
230 def test_scatter_gather(self):
231 view = self.client[:]
231 view = self.client[:]
232 seq1 = range(16)
232 seq1 = range(16)
233 view.scatter('a', seq1)
233 view.scatter('a', seq1)
234 seq2 = view.gather('a', block=True)
234 seq2 = view.gather('a', block=True)
235 self.assertEqual(seq2, seq1)
235 self.assertEqual(seq2, seq1)
236 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
236 self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True)
237
237
238 @skip_without('numpy')
238 @skip_without('numpy')
239 def test_scatter_gather_numpy(self):
239 def test_scatter_gather_numpy(self):
240 import numpy
240 import numpy
241 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
241 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
242 view = self.client[:]
242 view = self.client[:]
243 a = numpy.arange(64)
243 a = numpy.arange(64)
244 view.scatter('a', a, block=True)
244 view.scatter('a', a, block=True)
245 b = view.gather('a', block=True)
245 b = view.gather('a', block=True)
246 assert_array_equal(b, a)
246 assert_array_equal(b, a)
247
247
248 def test_scatter_gather_lazy(self):
248 def test_scatter_gather_lazy(self):
249 """scatter/gather with targets='all'"""
249 """scatter/gather with targets='all'"""
250 view = self.client.direct_view(targets='all')
250 view = self.client.direct_view(targets='all')
251 x = range(64)
251 x = range(64)
252 view.scatter('x', x)
252 view.scatter('x', x)
253 gathered = view.gather('x', block=True)
253 gathered = view.gather('x', block=True)
254 self.assertEqual(gathered, x)
254 self.assertEqual(gathered, x)
255
255
256
256
257 @dec.known_failure_py3
257 @dec.known_failure_py3
258 @skip_without('numpy')
258 @skip_without('numpy')
259 def test_push_numpy_nocopy(self):
259 def test_push_numpy_nocopy(self):
260 import numpy
260 import numpy
261 view = self.client[:]
261 view = self.client[:]
262 a = numpy.arange(64)
262 a = numpy.arange(64)
263 view['A'] = a
263 view['A'] = a
264 @interactive
264 @interactive
265 def check_writeable(x):
265 def check_writeable(x):
266 return x.flags.writeable
266 return x.flags.writeable
267
267
268 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
268 for flag in view.apply_sync(check_writeable, pmod.Reference('A')):
269 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
269 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
270
270
271 view.push(dict(B=a))
271 view.push(dict(B=a))
272 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
272 for flag in view.apply_sync(check_writeable, pmod.Reference('B')):
273 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
273 self.assertFalse(flag, "array is writeable, push shouldn't have pickled it")
274
274
275 @skip_without('numpy')
275 @skip_without('numpy')
276 def test_apply_numpy(self):
276 def test_apply_numpy(self):
277 """view.apply(f, ndarray)"""
277 """view.apply(f, ndarray)"""
278 import numpy
278 import numpy
279 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
279 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
280
280
281 A = numpy.random.random((100,100))
281 A = numpy.random.random((100,100))
282 view = self.client[-1]
282 view = self.client[-1]
283 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
283 for dt in [ 'int32', 'uint8', 'float32', 'float64' ]:
284 B = A.astype(dt)
284 B = A.astype(dt)
285 C = view.apply_sync(lambda x:x, B)
285 C = view.apply_sync(lambda x:x, B)
286 assert_array_equal(B,C)
286 assert_array_equal(B,C)
287
287
288 @skip_without('numpy')
288 @skip_without('numpy')
289 def test_push_pull_recarray(self):
289 def test_push_pull_recarray(self):
290 """push/pull recarrays"""
290 """push/pull recarrays"""
291 import numpy
291 import numpy
292 from numpy.testing.utils import assert_array_equal
292 from numpy.testing.utils import assert_array_equal
293
293
294 view = self.client[-1]
294 view = self.client[-1]
295
295
296 R = numpy.array([
296 R = numpy.array([
297 (1, 'hi', 0.),
297 (1, 'hi', 0.),
298 (2**30, 'there', 2.5),
298 (2**30, 'there', 2.5),
299 (-99999, 'world', -12345.6789),
299 (-99999, 'world', -12345.6789),
300 ], [('n', int), ('s', '|S10'), ('f', float)])
300 ], [('n', int), ('s', '|S10'), ('f', float)])
301
301
302 view['RR'] = R
302 view['RR'] = R
303 R2 = view['RR']
303 R2 = view['RR']
304
304
305 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
305 r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape)))
306 self.assertEqual(r_dtype, R.dtype)
306 self.assertEqual(r_dtype, R.dtype)
307 self.assertEqual(r_shape, R.shape)
307 self.assertEqual(r_shape, R.shape)
308 self.assertEqual(R2.dtype, R.dtype)
308 self.assertEqual(R2.dtype, R.dtype)
309 self.assertEqual(R2.shape, R.shape)
309 self.assertEqual(R2.shape, R.shape)
310 assert_array_equal(R2, R)
310 assert_array_equal(R2, R)
311
311
312 @skip_without('pandas')
312 @skip_without('pandas')
313 def test_push_pull_timeseries(self):
313 def test_push_pull_timeseries(self):
314 """push/pull pandas.TimeSeries"""
314 """push/pull pandas.TimeSeries"""
315 import pandas
315 import pandas
316
316
317 ts = pandas.TimeSeries(range(10))
317 ts = pandas.TimeSeries(range(10))
318
318
319 view = self.client[-1]
319 view = self.client[-1]
320
320
321 view.push(dict(ts=ts), block=True)
321 view.push(dict(ts=ts), block=True)
322 rts = view['ts']
322 rts = view['ts']
323
323
324 self.assertEqual(type(rts), type(ts))
324 self.assertEqual(type(rts), type(ts))
325 self.assertTrue((ts == rts).all())
325 self.assertTrue((ts == rts).all())
326
326
327 def test_map(self):
327 def test_map(self):
328 view = self.client[:]
328 view = self.client[:]
329 def f(x):
329 def f(x):
330 return x**2
330 return x**2
331 data = range(16)
331 data = range(16)
332 r = view.map_sync(f, data)
332 r = view.map_sync(f, data)
333 self.assertEqual(r, map(f, data))
333 self.assertEqual(r, map(f, data))
334
334
335 def test_map_iterable(self):
335 def test_map_iterable(self):
336 """test map on iterables (direct)"""
336 """test map on iterables (direct)"""
337 view = self.client[:]
337 view = self.client[:]
338 # 101 is prime, so it won't be evenly distributed
338 # 101 is prime, so it won't be evenly distributed
339 arr = range(101)
339 arr = range(101)
340 # ensure it will be an iterator, even in Python 3
340 # ensure it will be an iterator, even in Python 3
341 it = iter(arr)
341 it = iter(arr)
342 r = view.map_sync(lambda x:x, arr)
342 r = view.map_sync(lambda x:x, arr)
343 self.assertEqual(r, list(arr))
343 self.assertEqual(r, list(arr))
344
344
345 def test_scatter_gather_nonblocking(self):
345 def test_scatter_gather_nonblocking(self):
346 data = range(16)
346 data = range(16)
347 view = self.client[:]
347 view = self.client[:]
348 view.scatter('a', data, block=False)
348 view.scatter('a', data, block=False)
349 ar = view.gather('a', block=False)
349 ar = view.gather('a', block=False)
350 self.assertEqual(ar.get(), data)
350 self.assertEqual(ar.get(), data)
351
351
352 @skip_without('numpy')
352 @skip_without('numpy')
353 def test_scatter_gather_numpy_nonblocking(self):
353 def test_scatter_gather_numpy_nonblocking(self):
354 import numpy
354 import numpy
355 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
355 from numpy.testing.utils import assert_array_equal, assert_array_almost_equal
356 a = numpy.arange(64)
356 a = numpy.arange(64)
357 view = self.client[:]
357 view = self.client[:]
358 ar = view.scatter('a', a, block=False)
358 ar = view.scatter('a', a, block=False)
359 self.assertTrue(isinstance(ar, AsyncResult))
359 self.assertTrue(isinstance(ar, AsyncResult))
360 amr = view.gather('a', block=False)
360 amr = view.gather('a', block=False)
361 self.assertTrue(isinstance(amr, AsyncMapResult))
361 self.assertTrue(isinstance(amr, AsyncMapResult))
362 assert_array_equal(amr.get(), a)
362 assert_array_equal(amr.get(), a)
363
363
364 def test_execute(self):
364 def test_execute(self):
365 view = self.client[:]
365 view = self.client[:]
366 # self.client.debug=True
366 # self.client.debug=True
367 execute = view.execute
367 execute = view.execute
368 ar = execute('c=30', block=False)
368 ar = execute('c=30', block=False)
369 self.assertTrue(isinstance(ar, AsyncResult))
369 self.assertTrue(isinstance(ar, AsyncResult))
370 ar = execute('d=[0,1,2]', block=False)
370 ar = execute('d=[0,1,2]', block=False)
371 self.client.wait(ar, 1)
371 self.client.wait(ar, 1)
372 self.assertEqual(len(ar.get()), len(self.client))
372 self.assertEqual(len(ar.get()), len(self.client))
373 for c in view['c']:
373 for c in view['c']:
374 self.assertEqual(c, 30)
374 self.assertEqual(c, 30)
375
375
376 def test_abort(self):
376 def test_abort(self):
377 view = self.client[-1]
377 view = self.client[-1]
378 ar = view.execute('import time; time.sleep(1)', block=False)
378 ar = view.execute('import time; time.sleep(1)', block=False)
379 ar2 = view.apply_async(lambda : 2)
379 ar2 = view.apply_async(lambda : 2)
380 ar3 = view.apply_async(lambda : 3)
380 ar3 = view.apply_async(lambda : 3)
381 view.abort(ar2)
381 view.abort(ar2)
382 view.abort(ar3.msg_ids)
382 view.abort(ar3.msg_ids)
383 self.assertRaises(error.TaskAborted, ar2.get)
383 self.assertRaises(error.TaskAborted, ar2.get)
384 self.assertRaises(error.TaskAborted, ar3.get)
384 self.assertRaises(error.TaskAborted, ar3.get)
385
385
386 def test_abort_all(self):
386 def test_abort_all(self):
387 """view.abort() aborts all outstanding tasks"""
387 """view.abort() aborts all outstanding tasks"""
388 view = self.client[-1]
388 view = self.client[-1]
389 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
389 ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ]
390 view.abort()
390 view.abort()
391 view.wait(timeout=5)
391 view.wait(timeout=5)
392 for ar in ars[5:]:
392 for ar in ars[5:]:
393 self.assertRaises(error.TaskAborted, ar.get)
393 self.assertRaises(error.TaskAborted, ar.get)
394
394
395 def test_temp_flags(self):
395 def test_temp_flags(self):
396 view = self.client[-1]
396 view = self.client[-1]
397 view.block=True
397 view.block=True
398 with view.temp_flags(block=False):
398 with view.temp_flags(block=False):
399 self.assertFalse(view.block)
399 self.assertFalse(view.block)
400 self.assertTrue(view.block)
400 self.assertTrue(view.block)
401
401
402 @dec.known_failure_py3
402 @dec.known_failure_py3
403 def test_importer(self):
403 def test_importer(self):
404 view = self.client[-1]
404 view = self.client[-1]
405 view.clear(block=True)
405 view.clear(block=True)
406 with view.importer:
406 with view.importer:
407 import re
407 import re
408
408
409 @interactive
409 @interactive
410 def findall(pat, s):
410 def findall(pat, s):
411 # this globals() step isn't necessary in real code
411 # this globals() step isn't necessary in real code
412 # only to prevent a closure in the test
412 # only to prevent a closure in the test
413 re = globals()['re']
413 re = globals()['re']
414 return re.findall(pat, s)
414 return re.findall(pat, s)
415
415
416 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
416 self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split())
417
417
418 def test_unicode_execute(self):
418 def test_unicode_execute(self):
419 """test executing unicode strings"""
419 """test executing unicode strings"""
420 v = self.client[-1]
420 v = self.client[-1]
421 v.block=True
421 v.block=True
422 if sys.version_info[0] >= 3:
422 if sys.version_info[0] >= 3:
423 code="a='é'"
423 code="a='é'"
424 else:
424 else:
425 code=u"a=u'é'"
425 code=u"a=u'é'"
426 v.execute(code)
426 v.execute(code)
427 self.assertEqual(v['a'], u'é')
427 self.assertEqual(v['a'], u'é')
428
428
429 def test_unicode_apply_result(self):
429 def test_unicode_apply_result(self):
430 """test unicode apply results"""
430 """test unicode apply results"""
431 v = self.client[-1]
431 v = self.client[-1]
432 r = v.apply_sync(lambda : u'é')
432 r = v.apply_sync(lambda : u'é')
433 self.assertEqual(r, u'é')
433 self.assertEqual(r, u'é')
434
434
435 def test_unicode_apply_arg(self):
435 def test_unicode_apply_arg(self):
436 """test passing unicode arguments to apply"""
436 """test passing unicode arguments to apply"""
437 v = self.client[-1]
437 v = self.client[-1]
438
438
439 @interactive
439 @interactive
440 def check_unicode(a, check):
440 def check_unicode(a, check):
441 assert isinstance(a, unicode), "%r is not unicode"%a
441 assert isinstance(a, unicode), "%r is not unicode"%a
442 assert isinstance(check, bytes), "%r is not bytes"%check
442 assert isinstance(check, bytes), "%r is not bytes"%check
443 assert a.encode('utf8') == check, "%s != %s"%(a,check)
443 assert a.encode('utf8') == check, "%s != %s"%(a,check)
444
444
445 for s in [ u'é', u'ßø®∫',u'asdf' ]:
445 for s in [ u'é', u'ßø®∫',u'asdf' ]:
446 try:
446 try:
447 v.apply_sync(check_unicode, s, s.encode('utf8'))
447 v.apply_sync(check_unicode, s, s.encode('utf8'))
448 except error.RemoteError as e:
448 except error.RemoteError as e:
449 if e.ename == 'AssertionError':
449 if e.ename == 'AssertionError':
450 self.fail(e.evalue)
450 self.fail(e.evalue)
451 else:
451 else:
452 raise e
452 raise e
453
453
454 def test_map_reference(self):
454 def test_map_reference(self):
455 """view.map(<Reference>, *seqs) should work"""
455 """view.map(<Reference>, *seqs) should work"""
456 v = self.client[:]
456 v = self.client[:]
457 v.scatter('n', self.client.ids, flatten=True)
457 v.scatter('n', self.client.ids, flatten=True)
458 v.execute("f = lambda x,y: x*y")
458 v.execute("f = lambda x,y: x*y")
459 rf = pmod.Reference('f')
459 rf = pmod.Reference('f')
460 nlist = list(range(10))
460 nlist = list(range(10))
461 mlist = nlist[::-1]
461 mlist = nlist[::-1]
462 expected = [ m*n for m,n in zip(mlist, nlist) ]
462 expected = [ m*n for m,n in zip(mlist, nlist) ]
463 result = v.map_sync(rf, mlist, nlist)
463 result = v.map_sync(rf, mlist, nlist)
464 self.assertEqual(result, expected)
464 self.assertEqual(result, expected)
465
465
466 def test_apply_reference(self):
466 def test_apply_reference(self):
467 """view.apply(<Reference>, *args) should work"""
467 """view.apply(<Reference>, *args) should work"""
468 v = self.client[:]
468 v = self.client[:]
469 v.scatter('n', self.client.ids, flatten=True)
469 v.scatter('n', self.client.ids, flatten=True)
470 v.execute("f = lambda x: n*x")
470 v.execute("f = lambda x: n*x")
471 rf = pmod.Reference('f')
471 rf = pmod.Reference('f')
472 result = v.apply_sync(rf, 5)
472 result = v.apply_sync(rf, 5)
473 expected = [ 5*id for id in self.client.ids ]
473 expected = [ 5*id for id in self.client.ids ]
474 self.assertEqual(result, expected)
474 self.assertEqual(result, expected)
475
475
476 def test_eval_reference(self):
476 def test_eval_reference(self):
477 v = self.client[self.client.ids[0]]
477 v = self.client[self.client.ids[0]]
478 v['g'] = range(5)
478 v['g'] = range(5)
479 rg = pmod.Reference('g[0]')
479 rg = pmod.Reference('g[0]')
480 echo = lambda x:x
480 echo = lambda x:x
481 self.assertEqual(v.apply_sync(echo, rg), 0)
481 self.assertEqual(v.apply_sync(echo, rg), 0)
482
482
483 def test_reference_nameerror(self):
483 def test_reference_nameerror(self):
484 v = self.client[self.client.ids[0]]
484 v = self.client[self.client.ids[0]]
485 r = pmod.Reference('elvis_has_left')
485 r = pmod.Reference('elvis_has_left')
486 echo = lambda x:x
486 echo = lambda x:x
487 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
487 self.assertRaisesRemote(NameError, v.apply_sync, echo, r)
488
488
489 def test_single_engine_map(self):
489 def test_single_engine_map(self):
490 e0 = self.client[self.client.ids[0]]
490 e0 = self.client[self.client.ids[0]]
491 r = range(5)
491 r = range(5)
492 check = [ -1*i for i in r ]
492 check = [ -1*i for i in r ]
493 result = e0.map_sync(lambda x: -1*x, r)
493 result = e0.map_sync(lambda x: -1*x, r)
494 self.assertEqual(result, check)
494 self.assertEqual(result, check)
495
495
496 def test_len(self):
496 def test_len(self):
497 """len(view) makes sense"""
497 """len(view) makes sense"""
498 e0 = self.client[self.client.ids[0]]
498 e0 = self.client[self.client.ids[0]]
499 yield self.assertEqual(len(e0), 1)
499 yield self.assertEqual(len(e0), 1)
500 v = self.client[:]
500 v = self.client[:]
501 yield self.assertEqual(len(v), len(self.client.ids))
501 yield self.assertEqual(len(v), len(self.client.ids))
502 v = self.client.direct_view('all')
502 v = self.client.direct_view('all')
503 yield self.assertEqual(len(v), len(self.client.ids))
503 yield self.assertEqual(len(v), len(self.client.ids))
504 v = self.client[:2]
504 v = self.client[:2]
505 yield self.assertEqual(len(v), 2)
505 yield self.assertEqual(len(v), 2)
506 v = self.client[:1]
506 v = self.client[:1]
507 yield self.assertEqual(len(v), 1)
507 yield self.assertEqual(len(v), 1)
508 v = self.client.load_balanced_view()
508 v = self.client.load_balanced_view()
509 yield self.assertEqual(len(v), len(self.client.ids))
509 yield self.assertEqual(len(v), len(self.client.ids))
510 # parametric tests seem to require manual closing?
510 # parametric tests seem to require manual closing?
511 self.client.close()
511 self.client.close()
512
512
513
513
514 # begin execute tests
514 # begin execute tests
515
515
516 def test_execute_reply(self):
516 def test_execute_reply(self):
517 e0 = self.client[self.client.ids[0]]
517 e0 = self.client[self.client.ids[0]]
518 e0.block = True
518 e0.block = True
519 ar = e0.execute("5", silent=False)
519 ar = e0.execute("5", silent=False)
520 er = ar.get()
520 er = ar.get()
521 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
521 self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count)
522 self.assertEqual(er.pyout['data']['text/plain'], '5')
522 self.assertEqual(er.pyout['data']['text/plain'], '5')
523
523
524 def test_execute_reply_stdout(self):
524 def test_execute_reply_stdout(self):
525 e0 = self.client[self.client.ids[0]]
525 e0 = self.client[self.client.ids[0]]
526 e0.block = True
526 e0.block = True
527 ar = e0.execute("print (5)", silent=False)
527 ar = e0.execute("print (5)", silent=False)
528 er = ar.get()
528 er = ar.get()
529 self.assertEqual(er.stdout.strip(), '5')
529 self.assertEqual(er.stdout.strip(), '5')
530
530
531 def test_execute_pyout(self):
531 def test_execute_pyout(self):
532 """execute triggers pyout with silent=False"""
532 """execute triggers pyout with silent=False"""
533 view = self.client[:]
533 view = self.client[:]
534 ar = view.execute("5", silent=False, block=True)
534 ar = view.execute("5", silent=False, block=True)
535
535
536 expected = [{'text/plain' : '5'}] * len(view)
536 expected = [{'text/plain' : '5'}] * len(view)
537 mimes = [ out['data'] for out in ar.pyout ]
537 mimes = [ out['data'] for out in ar.pyout ]
538 self.assertEqual(mimes, expected)
538 self.assertEqual(mimes, expected)
539
539
540 def test_execute_silent(self):
540 def test_execute_silent(self):
541 """execute does not trigger pyout with silent=True"""
541 """execute does not trigger pyout with silent=True"""
542 view = self.client[:]
542 view = self.client[:]
543 ar = view.execute("5", block=True)
543 ar = view.execute("5", block=True)
544 expected = [None] * len(view)
544 expected = [None] * len(view)
545 self.assertEqual(ar.pyout, expected)
545 self.assertEqual(ar.pyout, expected)
546
546
547 def test_execute_magic(self):
547 def test_execute_magic(self):
548 """execute accepts IPython commands"""
548 """execute accepts IPython commands"""
549 view = self.client[:]
549 view = self.client[:]
550 view.execute("a = 5")
550 view.execute("a = 5")
551 ar = view.execute("%whos", block=True)
551 ar = view.execute("%whos", block=True)
552 # this will raise, if that failed
552 # this will raise, if that failed
553 ar.get(5)
553 ar.get(5)
554 for stdout in ar.stdout:
554 for stdout in ar.stdout:
555 lines = stdout.splitlines()
555 lines = stdout.splitlines()
556 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
556 self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info'])
557 found = False
557 found = False
558 for line in lines[2:]:
558 for line in lines[2:]:
559 split = line.split()
559 split = line.split()
560 if split == ['a', 'int', '5']:
560 if split == ['a', 'int', '5']:
561 found = True
561 found = True
562 break
562 break
563 self.assertTrue(found, "whos output wrong: %s" % stdout)
563 self.assertTrue(found, "whos output wrong: %s" % stdout)
564
564
565 def test_execute_displaypub(self):
565 def test_execute_displaypub(self):
566 """execute tracks display_pub output"""
566 """execute tracks display_pub output"""
567 view = self.client[:]
567 view = self.client[:]
568 view.execute("from IPython.core.display import *")
568 view.execute("from IPython.core.display import *")
569 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
569 ar = view.execute("[ display(i) for i in range(5) ]", block=True)
570
570
571 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
571 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
572 for outputs in ar.outputs:
572 for outputs in ar.outputs:
573 mimes = [ out['data'] for out in outputs ]
573 mimes = [ out['data'] for out in outputs ]
574 self.assertEqual(mimes, expected)
574 self.assertEqual(mimes, expected)
575
575
576 def test_apply_displaypub(self):
576 def test_apply_displaypub(self):
577 """apply tracks display_pub output"""
577 """apply tracks display_pub output"""
578 view = self.client[:]
578 view = self.client[:]
579 view.execute("from IPython.core.display import *")
579 view.execute("from IPython.core.display import *")
580
580
581 @interactive
581 @interactive
582 def publish():
582 def publish():
583 [ display(i) for i in range(5) ]
583 [ display(i) for i in range(5) ]
584
584
585 ar = view.apply_async(publish)
585 ar = view.apply_async(publish)
586 ar.get(5)
586 ar.get(5)
587 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
587 expected = [ {u'text/plain' : unicode(j)} for j in range(5) ]
588 for outputs in ar.outputs:
588 for outputs in ar.outputs:
589 mimes = [ out['data'] for out in outputs ]
589 mimes = [ out['data'] for out in outputs ]
590 self.assertEqual(mimes, expected)
590 self.assertEqual(mimes, expected)
591
591
592 def test_execute_raises(self):
592 def test_execute_raises(self):
593 """exceptions in execute requests raise appropriately"""
593 """exceptions in execute requests raise appropriately"""
594 view = self.client[-1]
594 view = self.client[-1]
595 ar = view.execute("1/0")
595 ar = view.execute("1/0")
596 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
596 self.assertRaisesRemote(ZeroDivisionError, ar.get, 2)
597
597
598 def test_remoteerror_render_exception(self):
598 def test_remoteerror_render_exception(self):
599 """RemoteErrors get nice tracebacks"""
599 """RemoteErrors get nice tracebacks"""
600 view = self.client[-1]
600 view = self.client[-1]
601 ar = view.execute("1/0")
601 ar = view.execute("1/0")
602 ip = get_ipython()
602 ip = get_ipython()
603 ip.user_ns['ar'] = ar
603 ip.user_ns['ar'] = ar
604 with capture_output() as io:
604 with capture_output() as io:
605 ip.run_cell("ar.get(2)")
605 ip.run_cell("ar.get(2)")
606
606
607 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
607 self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout)
608
608
609 def test_compositeerror_render_exception(self):
609 def test_compositeerror_render_exception(self):
610 """CompositeErrors get nice tracebacks"""
610 """CompositeErrors get nice tracebacks"""
611 view = self.client[:]
611 view = self.client[:]
612 ar = view.execute("1/0")
612 ar = view.execute("1/0")
613 ip = get_ipython()
613 ip = get_ipython()
614 ip.user_ns['ar'] = ar
614 ip.user_ns['ar'] = ar
615 with capture_output() as io:
615 with capture_output() as io:
616 ip.run_cell("ar.get(2)")
616 ip.run_cell("ar.get(2)")
617
617
618 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
618 self.assertEqual(io.stdout.count('ZeroDivisionError'), len(view) * 2, io.stdout)
619 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
619 self.assertEqual(io.stdout.count('by zero'), len(view), io.stdout)
620 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
620 self.assertEqual(io.stdout.count(':execute'), len(view), io.stdout)
621
621
622 def test_compositeerror_truncate(self):
623 """Truncate CompositeErrors with many exceptions"""
624 view = self.client[:]
625 msg_ids = []
626 for i in range(10):
627 ar = view.execute("1/0")
628 msg_ids.extend(ar.msg_ids)
629
630 ar = self.client.get_result(msg_ids)
631 try:
632 ar.get()
633 except error.CompositeError as e:
634 pass
635 else:
636 self.fail("Should have raised CompositeError")
637
638 lines = e.render_traceback()
639 with capture_output() as io:
640 e.print_traceback()
641
642 self.assertTrue("more exceptions" in lines[-1])
643 count = e.tb_limit
644
645 self.assertEqual(io.stdout.count('ZeroDivisionError'), 2 * count, io.stdout)
646 self.assertEqual(io.stdout.count('by zero'), count, io.stdout)
647 self.assertEqual(io.stdout.count(':execute'), count, io.stdout)
648
622 @dec.skipif_not_matplotlib
649 @dec.skipif_not_matplotlib
623 def test_magic_pylab(self):
650 def test_magic_pylab(self):
624 """%pylab works on engines"""
651 """%pylab works on engines"""
625 view = self.client[-1]
652 view = self.client[-1]
626 ar = view.execute("%pylab inline")
653 ar = view.execute("%pylab inline")
627 # at least check if this raised:
654 # at least check if this raised:
628 reply = ar.get(5)
655 reply = ar.get(5)
629 # include imports, in case user config
656 # include imports, in case user config
630 ar = view.execute("plot(rand(100))", silent=False)
657 ar = view.execute("plot(rand(100))", silent=False)
631 reply = ar.get(5)
658 reply = ar.get(5)
632 self.assertEqual(len(reply.outputs), 1)
659 self.assertEqual(len(reply.outputs), 1)
633 output = reply.outputs[0]
660 output = reply.outputs[0]
634 self.assertTrue("data" in output)
661 self.assertTrue("data" in output)
635 data = output['data']
662 data = output['data']
636 self.assertTrue("image/png" in data)
663 self.assertTrue("image/png" in data)
637
664
638 def test_func_default_func(self):
665 def test_func_default_func(self):
639 """interactively defined function as apply func default"""
666 """interactively defined function as apply func default"""
640 def foo():
667 def foo():
641 return 'foo'
668 return 'foo'
642
669
643 def bar(f=foo):
670 def bar(f=foo):
644 return f()
671 return f()
645
672
646 view = self.client[-1]
673 view = self.client[-1]
647 ar = view.apply_async(bar)
674 ar = view.apply_async(bar)
648 r = ar.get(10)
675 r = ar.get(10)
649 self.assertEqual(r, 'foo')
676 self.assertEqual(r, 'foo')
650 def test_data_pub_single(self):
677 def test_data_pub_single(self):
651 view = self.client[-1]
678 view = self.client[-1]
652 ar = view.execute('\n'.join([
679 ar = view.execute('\n'.join([
653 'from IPython.kernel.zmq.datapub import publish_data',
680 'from IPython.kernel.zmq.datapub import publish_data',
654 'for i in range(5):',
681 'for i in range(5):',
655 ' publish_data(dict(i=i))'
682 ' publish_data(dict(i=i))'
656 ]), block=False)
683 ]), block=False)
657 self.assertTrue(isinstance(ar.data, dict))
684 self.assertTrue(isinstance(ar.data, dict))
658 ar.get(5)
685 ar.get(5)
659 self.assertEqual(ar.data, dict(i=4))
686 self.assertEqual(ar.data, dict(i=4))
660
687
661 def test_data_pub(self):
688 def test_data_pub(self):
662 view = self.client[:]
689 view = self.client[:]
663 ar = view.execute('\n'.join([
690 ar = view.execute('\n'.join([
664 'from IPython.kernel.zmq.datapub import publish_data',
691 'from IPython.kernel.zmq.datapub import publish_data',
665 'for i in range(5):',
692 'for i in range(5):',
666 ' publish_data(dict(i=i))'
693 ' publish_data(dict(i=i))'
667 ]), block=False)
694 ]), block=False)
668 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
695 self.assertTrue(all(isinstance(d, dict) for d in ar.data))
669 ar.get(5)
696 ar.get(5)
670 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
697 self.assertEqual(ar.data, [dict(i=4)] * len(ar))
671
698
672 def test_can_list_arg(self):
699 def test_can_list_arg(self):
673 """args in lists are canned"""
700 """args in lists are canned"""
674 view = self.client[-1]
701 view = self.client[-1]
675 view['a'] = 128
702 view['a'] = 128
676 rA = pmod.Reference('a')
703 rA = pmod.Reference('a')
677 ar = view.apply_async(lambda x: x, [rA])
704 ar = view.apply_async(lambda x: x, [rA])
678 r = ar.get(5)
705 r = ar.get(5)
679 self.assertEqual(r, [128])
706 self.assertEqual(r, [128])
680
707
681 def test_can_dict_arg(self):
708 def test_can_dict_arg(self):
682 """args in dicts are canned"""
709 """args in dicts are canned"""
683 view = self.client[-1]
710 view = self.client[-1]
684 view['a'] = 128
711 view['a'] = 128
685 rA = pmod.Reference('a')
712 rA = pmod.Reference('a')
686 ar = view.apply_async(lambda x: x, dict(foo=rA))
713 ar = view.apply_async(lambda x: x, dict(foo=rA))
687 r = ar.get(5)
714 r = ar.get(5)
688 self.assertEqual(r, dict(foo=128))
715 self.assertEqual(r, dict(foo=128))
689
716
690 def test_can_list_kwarg(self):
717 def test_can_list_kwarg(self):
691 """kwargs in lists are canned"""
718 """kwargs in lists are canned"""
692 view = self.client[-1]
719 view = self.client[-1]
693 view['a'] = 128
720 view['a'] = 128
694 rA = pmod.Reference('a')
721 rA = pmod.Reference('a')
695 ar = view.apply_async(lambda x=5: x, x=[rA])
722 ar = view.apply_async(lambda x=5: x, x=[rA])
696 r = ar.get(5)
723 r = ar.get(5)
697 self.assertEqual(r, [128])
724 self.assertEqual(r, [128])
698
725
699 def test_can_dict_kwarg(self):
726 def test_can_dict_kwarg(self):
700 """kwargs in dicts are canned"""
727 """kwargs in dicts are canned"""
701 view = self.client[-1]
728 view = self.client[-1]
702 view['a'] = 128
729 view['a'] = 128
703 rA = pmod.Reference('a')
730 rA = pmod.Reference('a')
704 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
731 ar = view.apply_async(lambda x=5: x, dict(foo=rA))
705 r = ar.get(5)
732 r = ar.get(5)
706 self.assertEqual(r, dict(foo=128))
733 self.assertEqual(r, dict(foo=128))
707
734
708 def test_map_ref(self):
735 def test_map_ref(self):
709 """view.map works with references"""
736 """view.map works with references"""
710 view = self.client[:]
737 view = self.client[:]
711 ranks = sorted(self.client.ids)
738 ranks = sorted(self.client.ids)
712 view.scatter('rank', ranks, flatten=True)
739 view.scatter('rank', ranks, flatten=True)
713 rrank = pmod.Reference('rank')
740 rrank = pmod.Reference('rank')
714
741
715 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
742 amr = view.map_async(lambda x: x*2, [rrank] * len(view))
716 drank = amr.get(5)
743 drank = amr.get(5)
717 self.assertEqual(drank, [ r*2 for r in ranks ])
744 self.assertEqual(drank, [ r*2 for r in ranks ])
718
745
719 def test_nested_getitem_setitem(self):
746 def test_nested_getitem_setitem(self):
720 """get and set with view['a.b']"""
747 """get and set with view['a.b']"""
721 view = self.client[-1]
748 view = self.client[-1]
722 view.execute('\n'.join([
749 view.execute('\n'.join([
723 'class A(object): pass',
750 'class A(object): pass',
724 'a = A()',
751 'a = A()',
725 'a.b = 128',
752 'a.b = 128',
726 ]), block=True)
753 ]), block=True)
727 ra = pmod.Reference('a')
754 ra = pmod.Reference('a')
728
755
729 r = view.apply_sync(lambda x: x.b, ra)
756 r = view.apply_sync(lambda x: x.b, ra)
730 self.assertEqual(r, 128)
757 self.assertEqual(r, 128)
731 self.assertEqual(view['a.b'], 128)
758 self.assertEqual(view['a.b'], 128)
732
759
733 view['a.b'] = 0
760 view['a.b'] = 0
734
761
735 r = view.apply_sync(lambda x: x.b, ra)
762 r = view.apply_sync(lambda x: x.b, ra)
736 self.assertEqual(r, 0)
763 self.assertEqual(r, 0)
737 self.assertEqual(view['a.b'], 0)
764 self.assertEqual(view['a.b'], 0)
General Comments 0
You need to be logged in to leave comments. Login now