##// END OF EJS Templates
add rich AsyncResult behavior
MinRK -
Show More
@@ -1,113 +1,186 b''
1 """AsyncResult objects for the client"""
1 """AsyncResult objects for the client"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.external.decorator import decorator
13 import error
14 import error
14
15
15 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
16 # Classes
17 # Classes
17 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
18
19
20 @decorator
21 def check_ready(f, self, *args, **kwargs):
22 """Call spin() to sync state prior to calling the method."""
23 self.wait(0)
24 if not self._ready:
25 raise error.TimeoutError("result not ready")
26 return f(self, *args, **kwargs)
27
19 class AsyncResult(object):
28 class AsyncResult(object):
20 """Class for representing results of non-blocking calls.
29 """Class for representing results of non-blocking calls.
21
30
22 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
31 Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
23 """
32 """
24 def __init__(self, client, msg_ids, fname=''):
33 def __init__(self, client, msg_ids, fname=''):
25 self._client = client
34 self._client = client
26 self.msg_ids = msg_ids
35 self.msg_ids = msg_ids
27 self._fname=fname
36 self._fname=fname
28 self._ready = False
37 self._ready = False
29 self._success = None
38 self._success = None
30
39
31 def __repr__(self):
40 def __repr__(self):
32 if self._ready:
41 if self._ready:
33 return "<%s: finished>"%(self.__class__.__name__)
42 return "<%s: finished>"%(self.__class__.__name__)
34 else:
43 else:
35 return "<%s: %s>"%(self.__class__.__name__,self._fname)
44 return "<%s: %s>"%(self.__class__.__name__,self._fname)
36
45
37
46
38 def _reconstruct_result(self, res):
47 def _reconstruct_result(self, res):
39 """
48 """
40 Override me in subclasses for turning a list of results
49 Override me in subclasses for turning a list of results
41 into the expected form.
50 into the expected form.
42 """
51 """
43 if len(self.msg_ids) == 1:
52 if len(self.msg_ids) == 1:
44 return res[0]
53 return res[0]
45 else:
54 else:
46 return res
55 return res
47
56
48 def get(self, timeout=-1):
57 def get(self, timeout=-1):
49 """Return the result when it arrives.
58 """Return the result when it arrives.
50
59
51 If `timeout` is not ``None`` and the result does not arrive within
60 If `timeout` is not ``None`` and the result does not arrive within
52 `timeout` seconds then ``TimeoutError`` is raised. If the
61 `timeout` seconds then ``TimeoutError`` is raised. If the
53 remote call raised an exception then that exception will be reraised
62 remote call raised an exception then that exception will be reraised
54 by get().
63 by get().
55 """
64 """
56 if not self.ready():
65 if not self.ready():
57 self.wait(timeout)
66 self.wait(timeout)
58
67
59 if self._ready:
68 if self._ready:
60 if self._success:
69 if self._success:
61 return self._result
70 return self._result
62 else:
71 else:
63 raise self._exception
72 raise self._exception
64 else:
73 else:
65 raise error.TimeoutError("Result not ready.")
74 raise error.TimeoutError("Result not ready.")
66
75
67 def ready(self):
76 def ready(self):
68 """Return whether the call has completed."""
77 """Return whether the call has completed."""
69 if not self._ready:
78 if not self._ready:
70 self.wait(0)
79 self.wait(0)
71 return self._ready
80 return self._ready
72
81
73 def wait(self, timeout=-1):
82 def wait(self, timeout=-1):
74 """Wait until the result is available or until `timeout` seconds pass.
83 """Wait until the result is available or until `timeout` seconds pass.
75 """
84 """
76 if self._ready:
85 if self._ready:
77 return
86 return
78 self._ready = self._client.barrier(self.msg_ids, timeout)
87 self._ready = self._client.barrier(self.msg_ids, timeout)
79 if self._ready:
88 if self._ready:
80 try:
89 try:
81 results = map(self._client.results.get, self.msg_ids)
90 results = map(self._client.results.get, self.msg_ids)
91 self._result = results
82 results = error.collect_exceptions(results, self._fname)
92 results = error.collect_exceptions(results, self._fname)
83 self._result = self._reconstruct_result(results)
93 self._result = self._reconstruct_result(results)
84 except Exception, e:
94 except Exception, e:
85 self._exception = e
95 self._exception = e
86 self._success = False
96 self._success = False
87 else:
97 else:
88 self._success = True
98 self._success = True
99 finally:
100 self._metadata = map(self._client.metadata.get, self.msg_ids)
89
101
90
102
91 def successful(self):
103 def successful(self):
92 """Return whether the call completed without raising an exception.
104 """Return whether the call completed without raising an exception.
93
105
94 Will raise ``AssertionError`` if the result is not ready.
106 Will raise ``AssertionError`` if the result is not ready.
95 """
107 """
96 assert self._ready
108 assert self._ready
97 return self._success
109 return self._success
110
111 #----------------------------------------------------------------
112 # Extra methods not in mp.pool.AsyncResult
113 #----------------------------------------------------------------
114
115 def get_dict(self, timeout=-1):
116 """Get the results as a dict, keyed by engine_id."""
117 results = self.get(timeout)
118 engine_ids = [md['engine_id'] for md in self._metadata ]
119 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
120 maxcount = bycount.count(bycount[-1])
121 if maxcount > 1:
122 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
123 maxcount, bycount[-1]))
124
125 return dict(zip(engine_ids,results))
126
127 @property
128 @check_ready
129 def result(self):
130 """result property."""
131 return self._result
132
133 @property
134 @check_ready
135 def metadata(self):
136 """metadata property."""
137 return self._metadata
138
139 @property
140 @check_ready
141 def result_dict(self):
142 """result property as a dict."""
143 return self.get_dict(0)
144
145 #-------------------------------------
146 # dict-access
147 #-------------------------------------
148
149 @check_ready
150 def __getitem__(self, key):
151 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
152 """
153 if isinstance(key, int):
154 return error.collect_exceptions([self._result[key]], self._fname)[0]
155 elif isinstance(key, slice):
156 return error.collect_exceptions(self._result[key], self._fname)
157 elif isinstance(key, basestring):
158 return [ md[key] for md in self._metadata ]
159 else:
160 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
161
162 @check_ready
163 def __getattr__(self, key):
164 """getattr maps to getitem for convenient access to metadata."""
165 if key not in self._metadata[0].keys():
166 raise AttributeError("%r object has no attribute %r"%(
167 self.__class__.__name__, key))
168 return self.__getitem__(key)
98
169
170
99 class AsyncMapResult(AsyncResult):
171 class AsyncMapResult(AsyncResult):
100 """Class for representing results of non-blocking gathers.
172 """Class for representing results of non-blocking gathers.
101
173
102 This will properly reconstruct the gather.
174 This will properly reconstruct the gather.
103 """
175 """
104
176
105 def __init__(self, client, msg_ids, mapObject, fname=''):
177 def __init__(self, client, msg_ids, mapObject, fname=''):
106 self._mapObject = mapObject
178 self._mapObject = mapObject
107 AsyncResult.__init__(self, client, msg_ids, fname=fname)
179 AsyncResult.__init__(self, client, msg_ids, fname=fname)
108
180
109 def _reconstruct_result(self, res):
181 def _reconstruct_result(self, res):
110 """Perform the gather on the actual results."""
182 """Perform the gather on the actual results."""
111 return self._mapObject.joinPartitions(res)
183 return self._mapObject.joinPartitions(res)
112
184
113
185
186 __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file
@@ -1,283 +1,283 b''
1 # encoding: utf-8
1 # encoding: utf-8
2
2
3 """Classes and functions for kernel related errors and exceptions."""
3 """Classes and functions for kernel related errors and exceptions."""
4 from __future__ import print_function
4 from __future__ import print_function
5
5
6 __docformat__ = "restructuredtext en"
6 __docformat__ = "restructuredtext en"
7
7
8 # Tell nose to skip this module
8 # Tell nose to skip this module
9 __test__ = {}
9 __test__ = {}
10
10
11 #-------------------------------------------------------------------------------
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
12 # Copyright (C) 2008 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
16 #-------------------------------------------------------------------------------
17
17
18 #-------------------------------------------------------------------------------
18 #-------------------------------------------------------------------------------
19 # Error classes
19 # Error classes
20 #-------------------------------------------------------------------------------
20 #-------------------------------------------------------------------------------
21 class IPythonError(Exception):
21 class IPythonError(Exception):
22 """Base exception that all of our exceptions inherit from.
22 """Base exception that all of our exceptions inherit from.
23
23
24 This can be raised by code that doesn't have any more specific
24 This can be raised by code that doesn't have any more specific
25 information."""
25 information."""
26
26
27 pass
27 pass
28
28
29 # Exceptions associated with the controller objects
29 # Exceptions associated with the controller objects
30 class ControllerError(IPythonError): pass
30 class ControllerError(IPythonError): pass
31
31
32 class ControllerCreationError(ControllerError): pass
32 class ControllerCreationError(ControllerError): pass
33
33
34
34
35 # Exceptions associated with the Engines
35 # Exceptions associated with the Engines
36 class EngineError(IPythonError): pass
36 class EngineError(IPythonError): pass
37
37
38 class EngineCreationError(EngineError): pass
38 class EngineCreationError(EngineError): pass
39
39
40 class KernelError(IPythonError):
40 class KernelError(IPythonError):
41 pass
41 pass
42
42
43 class NotDefined(KernelError):
43 class NotDefined(KernelError):
44 def __init__(self, name):
44 def __init__(self, name):
45 self.name = name
45 self.name = name
46 self.args = (name,)
46 self.args = (name,)
47
47
48 def __repr__(self):
48 def __repr__(self):
49 return '<NotDefined: %s>' % self.name
49 return '<NotDefined: %s>' % self.name
50
50
51 __str__ = __repr__
51 __str__ = __repr__
52
52
53
53
54 class QueueCleared(KernelError):
54 class QueueCleared(KernelError):
55 pass
55 pass
56
56
57
57
58 class IdInUse(KernelError):
58 class IdInUse(KernelError):
59 pass
59 pass
60
60
61
61
62 class ProtocolError(KernelError):
62 class ProtocolError(KernelError):
63 pass
63 pass
64
64
65
65
66 class ConnectionError(KernelError):
66 class ConnectionError(KernelError):
67 pass
67 pass
68
68
69
69
70 class InvalidEngineID(KernelError):
70 class InvalidEngineID(KernelError):
71 pass
71 pass
72
72
73
73
74 class NoEnginesRegistered(KernelError):
74 class NoEnginesRegistered(KernelError):
75 pass
75 pass
76
76
77
77
78 class InvalidClientID(KernelError):
78 class InvalidClientID(KernelError):
79 pass
79 pass
80
80
81
81
82 class InvalidDeferredID(KernelError):
82 class InvalidDeferredID(KernelError):
83 pass
83 pass
84
84
85
85
86 class SerializationError(KernelError):
86 class SerializationError(KernelError):
87 pass
87 pass
88
88
89
89
90 class MessageSizeError(KernelError):
90 class MessageSizeError(KernelError):
91 pass
91 pass
92
92
93
93
94 class PBMessageSizeError(MessageSizeError):
94 class PBMessageSizeError(MessageSizeError):
95 pass
95 pass
96
96
97
97
98 class ResultNotCompleted(KernelError):
98 class ResultNotCompleted(KernelError):
99 pass
99 pass
100
100
101
101
102 class ResultAlreadyRetrieved(KernelError):
102 class ResultAlreadyRetrieved(KernelError):
103 pass
103 pass
104
104
105 class ClientError(KernelError):
105 class ClientError(KernelError):
106 pass
106 pass
107
107
108
108
109 class TaskAborted(KernelError):
109 class TaskAborted(KernelError):
110 pass
110 pass
111
111
112
112
113 class TaskTimeout(KernelError):
113 class TaskTimeout(KernelError):
114 pass
114 pass
115
115
116
116
117 class NotAPendingResult(KernelError):
117 class NotAPendingResult(KernelError):
118 pass
118 pass
119
119
120
120
121 class UnpickleableException(KernelError):
121 class UnpickleableException(KernelError):
122 pass
122 pass
123
123
124
124
125 class AbortedPendingDeferredError(KernelError):
125 class AbortedPendingDeferredError(KernelError):
126 pass
126 pass
127
127
128
128
129 class InvalidProperty(KernelError):
129 class InvalidProperty(KernelError):
130 pass
130 pass
131
131
132
132
133 class MissingBlockArgument(KernelError):
133 class MissingBlockArgument(KernelError):
134 pass
134 pass
135
135
136
136
137 class StopLocalExecution(KernelError):
137 class StopLocalExecution(KernelError):
138 pass
138 pass
139
139
140
140
141 class SecurityError(KernelError):
141 class SecurityError(KernelError):
142 pass
142 pass
143
143
144
144
145 class FileTimeoutError(KernelError):
145 class FileTimeoutError(KernelError):
146 pass
146 pass
147
147
148 class TimeoutError(KernelError):
148 class TimeoutError(KernelError):
149 pass
149 pass
150
150
151 class RemoteError(KernelError):
151 class RemoteError(KernelError):
152 """Error raised elsewhere"""
152 """Error raised elsewhere"""
153 ename=None
153 ename=None
154 evalue=None
154 evalue=None
155 traceback=None
155 traceback=None
156 engine_info=None
156 engine_info=None
157
157
158 def __init__(self, ename, evalue, traceback, engine_info=None):
158 def __init__(self, ename, evalue, traceback, engine_info=None):
159 self.ename=ename
159 self.ename=ename
160 self.evalue=evalue
160 self.evalue=evalue
161 self.traceback=traceback
161 self.traceback=traceback
162 self.engine_info=engine_info or {}
162 self.engine_info=engine_info or {}
163 self.args=(ename, evalue)
163 self.args=(ename, evalue)
164
164
165 def __repr__(self):
165 def __repr__(self):
166 engineid = self.engine_info.get('engineid', ' ')
166 engineid = self.engine_info.get('engineid', ' ')
167 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
167 return "<Remote[%s]:%s(%s)>"%(engineid, self.ename, self.evalue)
168
168
169 def __str__(self):
169 def __str__(self):
170 sig = "%s(%s)"%(self.ename, self.evalue)
170 sig = "%s(%s)"%(self.ename, self.evalue)
171 if self.traceback:
171 if self.traceback:
172 return sig + '\n' + self.traceback
172 return sig + '\n' + self.traceback
173 else:
173 else:
174 return sig
174 return sig
175
175
176
176
177 class TaskRejectError(KernelError):
177 class TaskRejectError(KernelError):
178 """Exception to raise when a task should be rejected by an engine.
178 """Exception to raise when a task should be rejected by an engine.
179
179
180 This exception can be used to allow a task running on an engine to test
180 This exception can be used to allow a task running on an engine to test
181 if the engine (or the user's namespace on the engine) has the needed
181 if the engine (or the user's namespace on the engine) has the needed
182 task dependencies. If not, the task should raise this exception. For
182 task dependencies. If not, the task should raise this exception. For
183 the task to be retried on another engine, the task should be created
183 the task to be retried on another engine, the task should be created
184 with the `retries` argument > 1.
184 with the `retries` argument > 1.
185
185
186 The advantage of this approach over our older properties system is that
186 The advantage of this approach over our older properties system is that
187 tasks have full access to the user's namespace on the engines and the
187 tasks have full access to the user's namespace on the engines and the
188 properties don't have to be managed or tested by the controller.
188 properties don't have to be managed or tested by the controller.
189 """
189 """
190
190
191
191
192 class CompositeError(KernelError):
192 class CompositeError(KernelError):
193 """Error for representing possibly multiple errors on engines"""
193 """Error for representing possibly multiple errors on engines"""
194 def __init__(self, message, elist):
194 def __init__(self, message, elist):
195 Exception.__init__(self, *(message, elist))
195 Exception.__init__(self, *(message, elist))
196 # Don't use pack_exception because it will conflict with the .message
196 # Don't use pack_exception because it will conflict with the .message
197 # attribute that is being deprecated in 2.6 and beyond.
197 # attribute that is being deprecated in 2.6 and beyond.
198 self.msg = message
198 self.msg = message
199 self.elist = elist
199 self.elist = elist
200 self.args = [ e[0] for e in elist ]
200 self.args = [ e[0] for e in elist ]
201
201
202 def _get_engine_str(self, ei):
202 def _get_engine_str(self, ei):
203 if not ei:
203 if not ei:
204 return '[Engine Exception]'
204 return '[Engine Exception]'
205 else:
205 else:
206 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
206 return '[%i:%s]: ' % (ei['engineid'], ei['method'])
207
207
208 def _get_traceback(self, ev):
208 def _get_traceback(self, ev):
209 try:
209 try:
210 tb = ev._ipython_traceback_text
210 tb = ev._ipython_traceback_text
211 except AttributeError:
211 except AttributeError:
212 return 'No traceback available'
212 return 'No traceback available'
213 else:
213 else:
214 return tb
214 return tb
215
215
216 def __str__(self):
216 def __str__(self):
217 s = str(self.msg)
217 s = str(self.msg)
218 for en, ev, etb, ei in self.elist:
218 for en, ev, etb, ei in self.elist:
219 engine_str = self._get_engine_str(ei)
219 engine_str = self._get_engine_str(ei)
220 s = s + '\n' + engine_str + en + ': ' + str(ev)
220 s = s + '\n' + engine_str + en + ': ' + str(ev)
221 return s
221 return s
222
222
223 def __repr__(self):
223 def __repr__(self):
224 return "CompositeError(%i)"%len(self.elist)
224 return "CompositeError(%i)"%len(self.elist)
225
225
226 def print_tracebacks(self, excid=None):
226 def print_tracebacks(self, excid=None):
227 if excid is None:
227 if excid is None:
228 for (en,ev,etb,ei) in self.elist:
228 for (en,ev,etb,ei) in self.elist:
229 print (self._get_engine_str(ei))
229 print (self._get_engine_str(ei))
230 print (etb or 'No traceback available')
230 print (etb or 'No traceback available')
231 print ()
231 print ()
232 else:
232 else:
233 try:
233 try:
234 en,ev,etb,ei = self.elist[excid]
234 en,ev,etb,ei = self.elist[excid]
235 except:
235 except:
236 raise IndexError("an exception with index %i does not exist"%excid)
236 raise IndexError("an exception with index %i does not exist"%excid)
237 else:
237 else:
238 print (self._get_engine_str(ei))
238 print (self._get_engine_str(ei))
239 print (etb or 'No traceback available')
239 print (etb or 'No traceback available')
240
240
241 def raise_exception(self, excid=0):
241 def raise_exception(self, excid=0):
242 try:
242 try:
243 en,ev,etb,ei = self.elist[excid]
243 en,ev,etb,ei = self.elist[excid]
244 except:
244 except:
245 raise IndexError("an exception with index %i does not exist"%excid)
245 raise IndexError("an exception with index %i does not exist"%excid)
246 else:
246 else:
247 try:
247 try:
248 raise RemoteError(en, ev, etb, ei)
248 raise RemoteError(en, ev, etb, ei)
249 except:
249 except:
250 et,ev,tb = sys.exc_info()
250 et,ev,tb = sys.exc_info()
251
251
252
252
253 def collect_exceptions(rdict_or_list, method):
253 def collect_exceptions(rdict_or_list, method='unspecified'):
254 """check a result dict for errors, and raise CompositeError if any exist.
254 """check a result dict for errors, and raise CompositeError if any exist.
255 Passthrough otherwise."""
255 Passthrough otherwise."""
256 elist = []
256 elist = []
257 if isinstance(rdict_or_list, dict):
257 if isinstance(rdict_or_list, dict):
258 rlist = rdict_or_list.values()
258 rlist = rdict_or_list.values()
259 else:
259 else:
260 rlist = rdict_or_list
260 rlist = rdict_or_list
261 for r in rlist:
261 for r in rlist:
262 if isinstance(r, RemoteError):
262 if isinstance(r, RemoteError):
263 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
263 en, ev, etb, ei = r.ename, r.evalue, r.traceback, r.engine_info
264 # Sometimes we could have CompositeError in our list. Just take
264 # Sometimes we could have CompositeError in our list. Just take
265 # the errors out of them and put them in our new list. This
265 # the errors out of them and put them in our new list. This
266 # has the effect of flattening lists of CompositeErrors into one
266 # has the effect of flattening lists of CompositeErrors into one
267 # CompositeError
267 # CompositeError
268 if en=='CompositeError':
268 if en=='CompositeError':
269 for e in ev.elist:
269 for e in ev.elist:
270 elist.append(e)
270 elist.append(e)
271 else:
271 else:
272 elist.append((en, ev, etb, ei))
272 elist.append((en, ev, etb, ei))
273 if len(elist)==0:
273 if len(elist)==0:
274 return rdict_or_list
274 return rdict_or_list
275 else:
275 else:
276 msg = "one or more exceptions from call to method: %s" % (method)
276 msg = "one or more exceptions from call to method: %s" % (method)
277 # This silliness is needed so the debugger has access to the exception
277 # This silliness is needed so the debugger has access to the exception
278 # instance (e in this case)
278 # instance (e in this case)
279 try:
279 try:
280 raise CompositeError(msg, elist)
280 raise CompositeError(msg, elist)
281 except CompositeError, e:
281 except CompositeError, e:
282 raise e
282 raise e
283
283
General Comments 0
You need to be logged in to leave comments. Login now