Show More
@@ -10,12 +10,21 | |||
|
10 | 10 | # Imports |
|
11 | 11 | #----------------------------------------------------------------------------- |
|
12 | 12 | |
|
13 | from IPython.external.decorator import decorator | |
|
13 | 14 | import error |
|
14 | 15 | |
|
15 | 16 | #----------------------------------------------------------------------------- |
|
16 | 17 | # Classes |
|
17 | 18 | #----------------------------------------------------------------------------- |
|
18 | 19 | |
|
20 | @decorator | |
|
21 | def check_ready(f, self, *args, **kwargs): | |
|
22 | """Call spin() to sync state prior to calling the method.""" | |
|
23 | self.wait(0) | |
|
24 | if not self._ready: | |
|
25 | raise error.TimeoutError("result not ready") | |
|
26 | return f(self, *args, **kwargs) | |
|
27 | ||
|
19 | 28 | class AsyncResult(object): |
|
20 | 29 | """Class for representing results of non-blocking calls. |
|
21 | 30 | |
@@ -79,6 +88,7 class AsyncResult(object): | |||
|
79 | 88 | if self._ready: |
|
80 | 89 | try: |
|
81 | 90 | results = map(self._client.results.get, self.msg_ids) |
|
91 | self._result = results | |
|
82 | 92 | results = error.collect_exceptions(results, self._fname) |
|
83 | 93 | self._result = self._reconstruct_result(results) |
|
84 | 94 | except Exception, e: |
@@ -86,6 +96,8 class AsyncResult(object): | |||
|
86 | 96 | self._success = False |
|
87 | 97 | else: |
|
88 | 98 | self._success = True |
|
99 | finally: | |
|
100 | self._metadata = map(self._client.metadata.get, self.msg_ids) | |
|
89 | 101 | |
|
90 | 102 | |
|
91 | 103 | def successful(self): |
@@ -95,7 +107,67 class AsyncResult(object): | |||
|
95 | 107 | """ |
|
96 | 108 | assert self._ready |
|
97 | 109 | return self._success |
|
110 | ||
|
111 | #---------------------------------------------------------------- | |
|
112 | # Extra methods not in mp.pool.AsyncResult | |
|
113 | #---------------------------------------------------------------- | |
|
114 | ||
|
115 | def get_dict(self, timeout=-1): | |
|
116 | """Get the results as a dict, keyed by engine_id.""" | |
|
117 | results = self.get(timeout) | |
|
118 | engine_ids = [md['engine_id'] for md in self._metadata ] | |
|
119 | bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k)) | |
|
120 | maxcount = bycount.count(bycount[-1]) | |
|
121 | if maxcount > 1: | |
|
122 | raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%( | |
|
123 | maxcount, bycount[-1])) | |
|
124 | ||
|
125 | return dict(zip(engine_ids,results)) | |
|
126 | ||
|
127 | @property | |
|
128 | @check_ready | |
|
129 | def result(self): | |
|
130 | """result property.""" | |
|
131 | return self._result | |
|
132 | ||
|
133 | @property | |
|
134 | @check_ready | |
|
135 | def metadata(self): | |
|
136 | """metadata property.""" | |
|
137 | return self._metadata | |
|
138 | ||
|
139 | @property | |
|
140 | @check_ready | |
|
141 | def result_dict(self): | |
|
142 | """result property as a dict.""" | |
|
143 | return self.get_dict(0) | |
|
144 | ||
|
145 | #------------------------------------- | |
|
146 | # dict-access | |
|
147 | #------------------------------------- | |
|
148 | ||
|
149 | @check_ready | |
|
150 | def __getitem__(self, key): | |
|
151 | """getitem returns result value(s) if keyed by int/slice, or metadata if key is str. | |
|
152 | """ | |
|
153 | if isinstance(key, int): | |
|
154 | return error.collect_exceptions([self._result[key]], self._fname)[0] | |
|
155 | elif isinstance(key, slice): | |
|
156 | return error.collect_exceptions(self._result[key], self._fname) | |
|
157 | elif isinstance(key, basestring): | |
|
158 | return [ md[key] for md in self._metadata ] | |
|
159 | else: | |
|
160 | raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key)) | |
|
161 | ||
|
162 | @check_ready | |
|
163 | def __getattr__(self, key): | |
|
164 | """getattr maps to getitem for convenient access to metadata.""" | |
|
165 | if key not in self._metadata[0].keys(): | |
|
166 | raise AttributeError("%r object has no attribute %r"%( | |
|
167 | self.__class__.__name__, key)) | |
|
168 | return self.__getitem__(key) | |
|
98 | 169 | |
|
170 | ||
|
99 | 171 | class AsyncMapResult(AsyncResult): |
|
100 | 172 | """Class for representing results of non-blocking gathers. |
|
101 | 173 | |
@@ -111,3 +183,4 class AsyncMapResult(AsyncResult): | |||
|
111 | 183 | return self._mapObject.joinPartitions(res) |
|
112 | 184 | |
|
113 | 185 | |
|
186 | __all__ = ['AsyncResult', 'AsyncMapResult'] No newline at end of file |
@@ -250,7 +250,7 class CompositeError(KernelError): | |||
|
250 | 250 | et,ev,tb = sys.exc_info() |
|
251 | 251 | |
|
252 | 252 | |
|
253 | def collect_exceptions(rdict_or_list, method): | |
|
253 | def collect_exceptions(rdict_or_list, method='unspecified'): | |
|
254 | 254 | """check a result dict for errors, and raise CompositeError if any exist. |
|
255 | 255 | Passthrough otherwise.""" |
|
256 | 256 | elist = [] |
General Comments 0
You need to be logged in to leave comments.
Login now