##// END OF EJS Templates
%%px --local runs local last...
MinRK -
Show More
@@ -1,430 +1,440 b''
1 1 # encoding: utf-8
2 2 """
3 3 =============
4 4 parallelmagic
5 5 =============
6 6
7 7 Magic command interface for interactive parallel work.
8 8
9 9 Usage
10 10 =====
11 11
12 12 ``%autopx``
13 13
14 14 {AUTOPX_DOC}
15 15
16 16 ``%px``
17 17
18 18 {PX_DOC}
19 19
20 20 ``%pxresult``
21 21
22 22 {RESULT_DOC}
23 23
24 24 ``%pxconfig``
25 25
26 26 {CONFIG_DOC}
27 27
28 28 """
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # Copyright (C) 2008 The IPython Development Team
32 32 #
33 33 # Distributed under the terms of the BSD License. The full license is in
34 34 # the file COPYING, distributed as part of this software.
35 35 #-----------------------------------------------------------------------------
36 36
37 37 #-----------------------------------------------------------------------------
38 38 # Imports
39 39 #-----------------------------------------------------------------------------
40 40
41 41 import ast
42 42 import re
43 43
44 44 from IPython.core.error import UsageError
45 45 from IPython.core.magic import Magics
46 46 from IPython.core import magic_arguments
47 47 from IPython.testing.skipdoctest import skip_doctest
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Definitions of magic functions for use with IPython
51 51 #-----------------------------------------------------------------------------
52 52
53 53
54 54 NO_LAST_RESULT = "%pxresult recalls last %px result, which has not yet been used."
55 55
56 56 def exec_args(f):
57 57 """decorator for adding block/targets args for execution
58 58
59 59 applied to %pxconfig and %%px
60 60 """
61 61 args = [
62 62 magic_arguments.argument('-b', '--block', action="store_const",
63 63 const=True, dest='block',
64 64 help="use blocking (sync) execution",
65 65 ),
66 66 magic_arguments.argument('-a', '--noblock', action="store_const",
67 67 const=False, dest='block',
68 68 help="use non-blocking (async) execution",
69 69 ),
70 70 magic_arguments.argument('-t', '--targets', type=str,
71 71 help="specify the targets on which to execute",
72 72 ),
73 73 magic_arguments.argument('--local', action="store_const",
74 74 const=True, dest="local",
75 75 help="also execute the cell in the local namespace",
76 76 ),
77 77 magic_arguments.argument('--verbose', action="store_const",
78 78 const=True, dest="set_verbose",
79 79 help="print a message at each execution",
80 80 ),
81 81 magic_arguments.argument('--no-verbose', action="store_const",
82 82 const=False, dest="set_verbose",
83 83 help="don't print any messages",
84 84 ),
85 85 ]
86 86 for a in args:
87 87 f = a(f)
88 88 return f
89 89
90 90 def output_args(f):
91 91 """decorator for output-formatting args
92 92
93 93 applied to %pxresult and %%px
94 94 """
95 95 args = [
96 96 magic_arguments.argument('-r', action="store_const", dest='groupby',
97 97 const='order',
98 98 help="collate outputs in order (same as group-outputs=order)"
99 99 ),
100 100 magic_arguments.argument('-e', action="store_const", dest='groupby',
101 101 const='engine',
102 102 help="group outputs by engine (same as group-outputs=engine)"
103 103 ),
104 104 magic_arguments.argument('--group-outputs', dest='groupby', type=str,
105 105 choices=['engine', 'order', 'type'], default='type',
106 106 help="""Group the outputs in a particular way.
107 107
108 108 Choices are:
109 109
110 110 type: group outputs of all engines by type (stdout, stderr, displaypub, etc.).
111 111
112 112 engine: display all output for each engine together.
113 113
114 114 order: like type, but individual displaypub output from each engine is collated.
115 115 For example, if multiple plots are generated by each engine, the first
116 116 figure of each engine will be displayed, then the second of each, etc.
117 117 """
118 118 ),
119 119 magic_arguments.argument('-o', '--out', dest='save_name', type=str,
120 120 help="""store the AsyncResult object for this computation
121 121 in the global namespace under this name.
122 122 """
123 123 ),
124 124 ]
125 125 for a in args:
126 126 f = a(f)
127 127 return f
128 128
129 129 class ParallelMagics(Magics):
130 130 """A set of magics useful when controlling a parallel IPython cluster.
131 131 """
132 132
133 133 # magic-related
134 134 magics = None
135 135 registered = True
136 136
137 137 # suffix for magics
138 138 suffix = ''
139 139 # A flag showing if autopx is activated or not
140 140 _autopx = False
141 141 # the current view used by the magics:
142 142 view = None
143 143 # last result cache for %pxresult
144 144 last_result = None
145 145 # verbose flag
146 146 verbose = False
147 147
148 148 def __init__(self, shell, view, suffix=''):
149 149 self.view = view
150 150 self.suffix = suffix
151 151
152 152 # register magics
153 153 self.magics = dict(cell={},line={})
154 154 line_magics = self.magics['line']
155 155
156 156 px = 'px' + suffix
157 157 if not suffix:
158 158 # keep %result for legacy compatibility
159 159 line_magics['result'] = self.result
160 160
161 161 line_magics['pxresult' + suffix] = self.result
162 162 line_magics[px] = self.px
163 163 line_magics['pxconfig' + suffix] = self.pxconfig
164 164 line_magics['auto' + px] = self.autopx
165 165
166 166 self.magics['cell'][px] = self.cell_px
167 167
168 168 super(ParallelMagics, self).__init__(shell=shell)
169 169
170 170 def _eval_target_str(self, ts):
171 171 if ':' in ts:
172 172 targets = eval("self.view.client.ids[%s]" % ts)
173 173 elif 'all' in ts:
174 174 targets = 'all'
175 175 else:
176 176 targets = eval(ts)
177 177 return targets
178 178
179 179 @magic_arguments.magic_arguments()
180 180 @exec_args
181 181 def pxconfig(self, line):
182 182 """configure default targets/blocking for %px magics"""
183 183 args = magic_arguments.parse_argstring(self.pxconfig, line)
184 184 if args.targets:
185 185 self.view.targets = self._eval_target_str(args.targets)
186 186 if args.block is not None:
187 187 self.view.block = args.block
188 188 if args.set_verbose is not None:
189 189 self.verbose = args.set_verbose
190 190
191 191 @magic_arguments.magic_arguments()
192 192 @output_args
193 193 @skip_doctest
194 194 def result(self, line=''):
195 195 """Print the result of the last asynchronous %px command.
196 196
197 197 This lets you recall the results of %px computations after
198 198 asynchronous submission (block=False).
199 199
200 200 Examples
201 201 --------
202 202 ::
203 203
204 204 In [23]: %px os.getpid()
205 205 Async parallel execution on engine(s): all
206 206
207 207 In [24]: %pxresult
208 208 Out[8:10]: 60920
209 209 Out[9:10]: 60921
210 210 Out[10:10]: 60922
211 211 Out[11:10]: 60923
212 212 """
213 213 args = magic_arguments.parse_argstring(self.result, line)
214 214
215 215 if self.last_result is None:
216 216 raise UsageError(NO_LAST_RESULT)
217 217
218 218 self.last_result.get()
219 219 self.last_result.display_outputs(groupby=args.groupby)
220 220
221 221 @skip_doctest
222 222 def px(self, line=''):
223 223 """Executes the given python command in parallel.
224 224
225 225 Examples
226 226 --------
227 227 ::
228 228
229 229 In [24]: %px a = os.getpid()
230 230 Parallel execution on engine(s): all
231 231
232 232 In [25]: %px print a
233 233 [stdout:0] 1234
234 234 [stdout:1] 1235
235 235 [stdout:2] 1236
236 236 [stdout:3] 1237
237 237 """
238 238 return self.parallel_execute(line)
239 239
240 240 def parallel_execute(self, cell, block=None, groupby='type', save_name=None):
241 241 """implementation used by %px and %%parallel"""
242 242
243 243 # defaults:
244 244 block = self.view.block if block is None else block
245 245
246 246 base = "Parallel" if block else "Async parallel"
247 247
248 248 targets = self.view.targets
249 249 if isinstance(targets, list) and len(targets) > 10:
250 250 str_targets = str(targets[:4])[:-1] + ', ..., ' + str(targets[-4:])[1:]
251 251 else:
252 252 str_targets = str(targets)
253 253 if self.verbose:
254 254 print base + " execution on engine(s): %s" % str_targets
255 255
256 256 result = self.view.execute(cell, silent=False, block=False)
257 257 self.last_result = result
258 258
259 259 if save_name:
260 260 self.shell.user_ns[save_name] = result
261 261
262 262 if block:
263 263 result.get()
264 264 result.display_outputs(groupby)
265 265 else:
266 266 # return AsyncResult only on non-blocking submission
267 267 return result
268 268
269 269 @magic_arguments.magic_arguments()
270 270 @exec_args
271 271 @output_args
272 272 @skip_doctest
273 273 def cell_px(self, line='', cell=None):
274 274 """Executes the cell in parallel.
275 275
276 276 Examples
277 277 --------
278 278 ::
279 279
280 280 In [24]: %%px --noblock
281 281 ....: a = os.getpid()
282 282 Async parallel execution on engine(s): all
283 283
284 284 In [25]: %%px
285 285 ....: print a
286 286 [stdout:0] 1234
287 287 [stdout:1] 1235
288 288 [stdout:2] 1236
289 289 [stdout:3] 1237
290 290 """
291 291
292 292 args = magic_arguments.parse_argstring(self.cell_px, line)
293 293
294 294 if args.targets:
295 295 save_targets = self.view.targets
296 296 self.view.targets = self._eval_target_str(args.targets)
297 if args.local:
298 self.shell.run_cell(cell)
297 # if running local, don't block until after local has run
298 block = False if args.local else args.block
299 299 try:
300 return self.parallel_execute(cell, block=args.block,
300 ar = self.parallel_execute(cell, block=block,
301 301 groupby=args.groupby,
302 302 save_name=args.save_name,
303 303 )
304 304 finally:
305 305 if args.targets:
306 306 self.view.targets = save_targets
307
308 # run locally after submitting remote
309 if args.local:
310 self.shell.run_cell(cell)
311 # now apply blocking behavor to remote execution
312 block = self.view.block if args.block is None else args.block
313 if block:
314 ar.get()
315 ar.display_outputs(groupby)
316 return ar
307 317
308 318 @skip_doctest
309 319 def autopx(self, line=''):
310 320 """Toggles auto parallel mode.
311 321
312 322 Once this is called, all commands typed at the command line are send to
313 323 the engines to be executed in parallel. To control which engine are
314 324 used, the ``targets`` attribute of the view before
315 325 entering ``%autopx`` mode.
316 326
317 327
318 328 Then you can do the following::
319 329
320 330 In [25]: %autopx
321 331 %autopx to enabled
322 332
323 333 In [26]: a = 10
324 334 Parallel execution on engine(s): [0,1,2,3]
325 335 In [27]: print a
326 336 Parallel execution on engine(s): [0,1,2,3]
327 337 [stdout:0] 10
328 338 [stdout:1] 10
329 339 [stdout:2] 10
330 340 [stdout:3] 10
331 341
332 342
333 343 In [27]: %autopx
334 344 %autopx disabled
335 345 """
336 346 if self._autopx:
337 347 self._disable_autopx()
338 348 else:
339 349 self._enable_autopx()
340 350
341 351 def _enable_autopx(self):
342 352 """Enable %autopx mode by saving the original run_cell and installing
343 353 pxrun_cell.
344 354 """
345 355 # override run_cell
346 356 self._original_run_cell = self.shell.run_cell
347 357 self.shell.run_cell = self.pxrun_cell
348 358
349 359 self._autopx = True
350 360 print "%autopx enabled"
351 361
352 362 def _disable_autopx(self):
353 363 """Disable %autopx by restoring the original InteractiveShell.run_cell.
354 364 """
355 365 if self._autopx:
356 366 self.shell.run_cell = self._original_run_cell
357 367 self._autopx = False
358 368 print "%autopx disabled"
359 369
360 370 def pxrun_cell(self, raw_cell, store_history=False, silent=False):
361 371 """drop-in replacement for InteractiveShell.run_cell.
362 372
363 373 This executes code remotely, instead of in the local namespace.
364 374
365 375 See InteractiveShell.run_cell for details.
366 376 """
367 377
368 378 if (not raw_cell) or raw_cell.isspace():
369 379 return
370 380
371 381 ipself = self.shell
372 382
373 383 with ipself.builtin_trap:
374 384 cell = ipself.prefilter_manager.prefilter_lines(raw_cell)
375 385
376 386 # Store raw and processed history
377 387 if store_history:
378 388 ipself.history_manager.store_inputs(ipself.execution_count,
379 389 cell, raw_cell)
380 390
381 391 # ipself.logger.log(cell, raw_cell)
382 392
383 393 cell_name = ipself.compile.cache(cell, ipself.execution_count)
384 394
385 395 try:
386 396 ast.parse(cell, filename=cell_name)
387 397 except (OverflowError, SyntaxError, ValueError, TypeError,
388 398 MemoryError):
389 399 # Case 1
390 400 ipself.showsyntaxerror()
391 401 ipself.execution_count += 1
392 402 return None
393 403 except NameError:
394 404 # ignore name errors, because we don't know the remote keys
395 405 pass
396 406
397 407 if store_history:
398 408 # Write output to the database. Does nothing unless
399 409 # history output logging is enabled.
400 410 ipself.history_manager.store_output(ipself.execution_count)
401 411 # Each cell is a *single* input, regardless of how many lines it has
402 412 ipself.execution_count += 1
403 413 if re.search(r'get_ipython\(\)\.magic\(u?["\']%?autopx', cell):
404 414 self._disable_autopx()
405 415 return False
406 416 else:
407 417 try:
408 418 result = self.view.execute(cell, silent=False, block=False)
409 419 except:
410 420 ipself.showtraceback()
411 421 return True
412 422 else:
413 423 if self.view.block:
414 424 try:
415 425 result.get()
416 426 except:
417 427 self.shell.showtraceback()
418 428 return True
419 429 else:
420 430 with ipself.builtin_trap:
421 431 result.display_outputs()
422 432 return False
423 433
424 434
425 435 __doc__ = __doc__.format(
426 436 AUTOPX_DOC = ' '*8 + ParallelMagics.autopx.__doc__,
427 437 PX_DOC = ' '*8 + ParallelMagics.px.__doc__,
428 438 RESULT_DOC = ' '*8 + ParallelMagics.result.__doc__,
429 439 CONFIG_DOC = ' '*8 + ParallelMagics.pxconfig.__doc__,
430 440 )
General Comments 0
You need to be logged in to leave comments. Login now