Show More
@@ -36,7 +36,9 Usage | |||
|
36 | 36 | |
|
37 | 37 | import ast |
|
38 | 38 | import re |
|
39 | import sys | |
|
39 | 40 | |
|
41 | from IPython.core.display import display | |
|
40 | 42 | from IPython.core.error import UsageError |
|
41 | 43 | from IPython.core.magic import Magics, magics_class, line_magic |
|
42 | 44 | from IPython.testing.skipdoctest import skip_doctest |
@@ -53,7 +55,7 NO_ACTIVE_VIEW = "Use activate() on a DirectView object to use it with magics." | |||
|
53 | 55 | class ParallelMagics(Magics): |
|
54 | 56 | """A set of magics useful when controlling a parallel IPython cluster. |
|
55 | 57 | """ |
|
56 | ||
|
58 | ||
|
57 | 59 | # A flag showing if autopx is activated or not |
|
58 | 60 | _autopx = False |
|
59 | 61 | # the current view used by the magics: |
@@ -66,23 +68,36 class ParallelMagics(Magics): | |||
|
66 | 68 | |
|
67 | 69 | To use this a :class:`DirectView` instance must be created |
|
68 | 70 | and then activated by calling its :meth:`activate` method. |
|
71 | ||
|
72 | This lets you recall the results of %px computations after | |
|
73 | asynchronous submission (view.block=False). | |
|
69 | 74 | |
|
70 | 75 | Then you can do the following:: |
|
71 | 76 | |
|
72 |
In [23]: % |
|
|
73 | Out[23]: <AsyncResult: unknown> | |
|
77 | In [23]: %px os.getpid() | |
|
78 | Async parallel execution on engine(s): all | |
|
74 | 79 | |
|
75 |
In [2 |
|
|
80 | In [24]: %result | |
|
81 | [ 8] Out[10]: 60920 | |
|
82 | [ 9] Out[10]: 60921 | |
|
83 | [10] Out[10]: 60922 | |
|
84 | [11] Out[10]: 60923 | |
|
76 | 85 | """ |
|
86 | ||
|
77 | 87 | if self.active_view is None: |
|
78 | 88 | raise UsageError(NO_ACTIVE_VIEW) |
|
79 | ||
|
89 | ||
|
90 | stride = len(self.active_view) | |
|
80 | 91 | try: |
|
81 | 92 | index = int(parameter_s) |
|
82 | 93 | except: |
|
83 |
index = |
|
|
84 | result = self.active_view.get_result(index) | |
|
85 | return result | |
|
94 | index = -1 | |
|
95 | msg_ids = self.active_view.history[stride * index:(stride * (index + 1)) or None] | |
|
96 | ||
|
97 | result = self.active_view.get_result(msg_ids) | |
|
98 | ||
|
99 | result.get() | |
|
100 | self._display_result(result) | |
|
86 | 101 | |
|
87 | 102 | @skip_doctest |
|
88 | 103 | @line_magic |
@@ -106,11 +121,14 class ParallelMagics(Magics): | |||
|
106 | 121 | |
|
107 | 122 | if self.active_view is None: |
|
108 | 123 | raise UsageError(NO_ACTIVE_VIEW) |
|
109 | print "Parallel execution on engine(s): %s" % self.active_view.targets | |
|
110 | result = self.active_view.execute(parameter_s, block=False) | |
|
124 | ||
|
125 | base = "Parallel" if self.active_view.block else "Async parallel" | |
|
126 | print base + " execution on engine(s): %s" % self.active_view.targets | |
|
127 | ||
|
128 | result = self.active_view.execute(parameter_s, silent=False, block=False) | |
|
111 | 129 | if self.active_view.block: |
|
112 | 130 | result.get() |
|
113 |
self. |
|
|
131 | self._display_result(result) | |
|
114 | 132 | |
|
115 | 133 | @skip_doctest |
|
116 | 134 | @line_magic |
@@ -171,18 +189,21 class ParallelMagics(Magics): | |||
|
171 | 189 | |
|
172 | 190 | def _display_result(self, result): |
|
173 | 191 | """Display the output of a parallel result. |
|
174 | ||
|
175 | If self.active_view.block is True, wait for the result | |
|
176 | and display the result. Otherwise, this is a noop. | |
|
177 | 192 | """ |
|
193 | # flush iopub, just in case | |
|
194 | rc = self.active_view.client | |
|
195 | rc._flush_iopub(rc._iopub_socket) | |
|
196 | ||
|
178 | 197 | if result._single_result: |
|
179 | 198 | # single result |
|
180 | 199 | stdouts = [result.stdout.rstrip()] |
|
181 | 200 | stderrs = [result.stderr.rstrip()] |
|
201 | outputs = [result.outputs] | |
|
182 | 202 | else: |
|
183 | 203 | stdouts = [s.rstrip() for s in result.stdout] |
|
184 | 204 | stderrs = [s.rstrip() for s in result.stderr] |
|
185 | ||
|
205 | outputs = [outs for outs in result.outputs] | |
|
206 | ||
|
186 | 207 | results = result.get_dict() |
|
187 | 208 | |
|
188 | 209 | targets = self.active_view.targets |
@@ -190,10 +211,29 class ParallelMagics(Magics): | |||
|
190 | 211 | targets = [targets] |
|
191 | 212 | elif targets == 'all': |
|
192 | 213 | targets = self.active_view.client.ids |
|
193 | ||
|
214 | ||
|
215 | # republish stdout: | |
|
194 | 216 | if any(stdouts): |
|
195 | 217 | for eid,stdout in zip(targets, stdouts): |
|
196 | print '[stdout:%i]'%eid, stdout | |
|
218 | print '[stdout:%2i]' % eid, stdout | |
|
219 | ||
|
220 | # republish stderr: | |
|
221 | if any(stderrs): | |
|
222 | for eid,stderr in zip(targets, stderrs): | |
|
223 | print >> sys.stderr, '[stderr:%2i]' % eid, stderr | |
|
224 | ||
|
225 | # republish displaypub output | |
|
226 | for eid,e_outputs in zip(targets, outputs): | |
|
227 | for output in e_outputs: | |
|
228 | md = output['metadata'] or {} | |
|
229 | md['engine'] = eid | |
|
230 | self.shell.display_pub.publish(output['source'], output['data'], md) | |
|
231 | ||
|
232 | # finally, add pyout: | |
|
233 | for eid in targets: | |
|
234 | r = results[eid] | |
|
235 | if r.pyout: | |
|
236 | display(r) | |
|
197 | 237 | |
|
198 | 238 | |
|
199 | 239 | def pxrun_cell(self, raw_cell, store_history=False, silent=False): |
@@ -256,7 +296,7 class ParallelMagics(Magics): | |||
|
256 | 296 | self.shell.showtraceback() |
|
257 | 297 | return True |
|
258 | 298 | else: |
|
259 |
self. |
|
|
299 | self._display_result(result) | |
|
260 | 300 | return False |
|
261 | 301 | |
|
262 | 302 |
General Comments 0
You need to be logged in to leave comments.
Login now