Show More
@@ -10,12 +10,21 b'' | |||||
10 | # Imports |
|
10 | # Imports | |
11 | #----------------------------------------------------------------------------- |
|
11 | #----------------------------------------------------------------------------- | |
12 |
|
12 | |||
|
13 | from IPython.external.decorator import decorator | |||
13 | import error |
|
14 | import error | |
14 |
|
15 | |||
15 | #----------------------------------------------------------------------------- |
|
16 | #----------------------------------------------------------------------------- | |
16 | # Classes |
|
17 | # Classes | |
17 | #----------------------------------------------------------------------------- |
|
18 | #----------------------------------------------------------------------------- | |
18 |
|
19 | |||
|
20 | @decorator | |||
|
21 | def check_ready(f, self, *args, **kwargs): | |||
|
22 | """Call spin() to sync state prior to calling the method.""" | |||
|
23 | self.wait(0) | |||
|
24 | if not self._ready: | |||
|
25 | raise error.TimeoutError("result not ready") | |||
|
26 | return f(self, *args, **kwargs) | |||
|
27 | ||||
19 | class AsyncResult(object): |
|
28 | class AsyncResult(object): | |
20 | """Class for representing results of non-blocking calls. |
|
29 | """Class for representing results of non-blocking calls. | |
21 |
|
30 | |||
@@ -79,6 +88,7 b' class AsyncResult(object):' | |||||
79 | if self._ready: |
|
88 | if self._ready: | |
80 | try: |
|
89 | try: | |
81 | results = map(self._client.results.get, self.msg_ids) |
|
90 | results = map(self._client.results.get, self.msg_ids) | |
|
91 | self._result = results | |||
82 | results = error.collect_exceptions(results, self._fname) |
|
92 | results = error.collect_exceptions(results, self._fname) | |
83 | self._result = self._reconstruct_result(results) |
|
93 | self._result = self._reconstruct_result(results) | |
84 | except Exception, e: |
|
94 | except Exception, e: | |
@@ -86,6 +96,8 b' class AsyncResult(object):' | |||||
86 | self._success = False |
|
96 | self._success = False | |
87 | else: |
|
97 | else: | |
88 | self._success = True |
|
98 | self._success = True | |
|
99 | finally: | |||
|
100 | self._metadata = map(self._client.metadata.get, self.msg_ids) | |||
89 |
|
101 | |||
90 |
|
102 | |||
91 | def successful(self): |
|
103 | def successful(self): | |
@@ -95,7 +107,67 b' class AsyncResult(object):' | |||||
95 | """ |
|
107 | """ | |
96 | assert self._ready |
|
108 | assert self._ready | |
97 | return self._success |
|
109 | return self._success | |
|
110 | ||||
|
111 | #---------------------------------------------------------------- | |||
|
112 | # Extra methods not in mp.pool.AsyncResult | |||
|
113 | #---------------------------------------------------------------- | |||
|
114 | ||||
|
115 | def get_dict(self, timeout=-1): | |||
|
116 | """Get the results as a dict, keyed by engine_id.""" | |||
|
117 | results = self.get(timeout) | |||
|
118 | engine_ids = [md['engine_id'] for md in self._metadata ] | |||
|
119 | bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k)) | |||
|
120 | maxcount = bycount.count(bycount[-1]) | |||
|
121 | if maxcount > 1: | |||
|
122 | raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%( | |||
|
123 | maxcount, bycount[-1])) | |||
|
124 | ||||
|
125 | return dict(zip(engine_ids,results)) | |||
|
126 | ||||
|
127 | @property | |||
|
128 | @check_ready | |||
|
129 | def result(self): | |||
|
130 | """result property.""" | |||
|
131 | return self._result | |||
|
132 | ||||
|
133 | @property | |||
|
134 | @check_ready | |||
|
135 | def metadata(self): | |||
|
136 | """metadata property.""" | |||
|
137 | return self._metadata | |||
|
138 | ||||
|
139 | @property | |||
|
140 | @check_ready | |||
|
141 | def result_dict(self): | |||
|
142 | """result property as a dict.""" | |||
|
143 | return self.get_dict(0) | |||
|
144 | ||||
|
145 | #------------------------------------- | |||
|
146 | # dict-access | |||
|
147 | #------------------------------------- | |||
|
148 | ||||
|
149 | @check_ready | |||
|
150 | def __getitem__(self, key): | |||
|
151 | """getitem returns result value(s) if keyed by int/slice, or metadata if key is str. | |||
|
152 | """ | |||
|
153 | if isinstance(key, int): | |||
|
154 | return error.collect_exceptions([self._result[key]], self._fname)[0] | |||
|
155 | elif isinstance(key, slice): | |||
|
156 | return error.collect_exceptions(self._result[key], self._fname) | |||
|
157 | elif isinstance(key, basestring): | |||
|
158 | return [ md[key] for md in self._metadata ] | |||
|
159 | else: | |||
|
160 | raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key)) | |||
|
161 | ||||
|
162 | @check_ready | |||
|
163 | def __getattr__(self, key): | |||
|
164 | """getattr maps to getitem for convenient access to metadata.""" | |||
|
165 | if key not in self._metadata[0].keys(): | |||
|
166 | raise AttributeError("%r object has no attribute %r"%( | |||
|
167 | self.__class__.__name__, key)) | |||
|
168 | return self.__getitem__(key) | |||
98 |
|
169 | |||
|
170 | ||||
99 | class AsyncMapResult(AsyncResult): |
|
171 | class AsyncMapResult(AsyncResult): | |
100 | """Class for representing results of non-blocking gathers. |
|
172 | """Class for representing results of non-blocking gathers. | |
101 |
|
173 | |||
@@ -111,3 +183,4 b' class AsyncMapResult(AsyncResult):' | |||||
111 | return self._mapObject.joinPartitions(res) |
|
183 | return self._mapObject.joinPartitions(res) | |
112 |
|
184 | |||
113 |
|
185 | |||
|
186 | __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file |
@@ -250,7 +250,7 b' class CompositeError(KernelError):' | |||||
250 | et,ev,tb = sys.exc_info() |
|
250 | et,ev,tb = sys.exc_info() | |
251 |
|
251 | |||
252 |
|
252 | |||
253 | def collect_exceptions(rdict_or_list, method): |
|
253 | def collect_exceptions(rdict_or_list, method='unspecified'): | |
254 | """check a result dict for errors, and raise CompositeError if any exist. |
|
254 | """check a result dict for errors, and raise CompositeError if any exist. | |
255 | Passthrough otherwise.""" |
|
255 | Passthrough otherwise.""" | |
256 | elist = [] |
|
256 | elist = [] |
General Comments 0
You need to be logged in to leave comments.
Login now