##// END OF EJS Templates
add `%%px --local` for local execution...
MinRK -
Show More
@@ -1,424 +1,430
1 1 # encoding: utf-8
2 2 """
3 3 =============
4 4 parallelmagic
5 5 =============
6 6
7 7 Magic command interface for interactive parallel work.
8 8
9 9 Usage
10 10 =====
11 11
12 12 ``%autopx``
13 13
14 14 {AUTOPX_DOC}
15 15
16 16 ``%px``
17 17
18 18 {PX_DOC}
19 19
20 20 ``%pxresult``
21 21
22 22 {RESULT_DOC}
23 23
24 24 ``%pxconfig``
25 25
26 26 {CONFIG_DOC}
27 27
28 28 """
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # Copyright (C) 2008 The IPython Development Team
32 32 #
33 33 # Distributed under the terms of the BSD License. The full license is in
34 34 # the file COPYING, distributed as part of this software.
35 35 #-----------------------------------------------------------------------------
36 36
37 37 #-----------------------------------------------------------------------------
38 38 # Imports
39 39 #-----------------------------------------------------------------------------
40 40
41 41 import ast
42 42 import re
43 43
44 44 from IPython.core.error import UsageError
45 45 from IPython.core.magic import Magics
46 46 from IPython.core import magic_arguments
47 47 from IPython.testing.skipdoctest import skip_doctest
48 48
49 49 #-----------------------------------------------------------------------------
50 50 # Definitions of magic functions for use with IPython
51 51 #-----------------------------------------------------------------------------
52 52
53 53
54 54 NO_LAST_RESULT = "%pxresult recalls last %px result, which has not yet been used."
55 55
56 56 def exec_args(f):
57 57 """decorator for adding block/targets args for execution
58 58
59 59 applied to %pxconfig and %%px
60 60 """
61 61 args = [
62 62 magic_arguments.argument('-b', '--block', action="store_const",
63 63 const=True, dest='block',
64 64 help="use blocking (sync) execution",
65 65 ),
66 66 magic_arguments.argument('-a', '--noblock', action="store_const",
67 67 const=False, dest='block',
68 68 help="use non-blocking (async) execution",
69 69 ),
70 70 magic_arguments.argument('-t', '--targets', type=str,
71 71 help="specify the targets on which to execute",
72 72 ),
73 magic_arguments.argument('--local', action="store_const",
74 const=True, dest="local",
75 help="also execute the cell in the local namespace",
76 ),
73 77 magic_arguments.argument('--verbose', action="store_const",
74 78 const=True, dest="set_verbose",
75 79 help="print a message at each execution",
76 80 ),
77 81 magic_arguments.argument('--no-verbose', action="store_const",
78 82 const=False, dest="set_verbose",
79 83 help="don't print any messages",
80 84 ),
81 85 ]
82 86 for a in args:
83 87 f = a(f)
84 88 return f
85 89
86 90 def output_args(f):
87 91 """decorator for output-formatting args
88 92
89 93 applied to %pxresult and %%px
90 94 """
91 95 args = [
92 96 magic_arguments.argument('-r', action="store_const", dest='groupby',
93 97 const='order',
94 98 help="collate outputs in order (same as group-outputs=order)"
95 99 ),
96 100 magic_arguments.argument('-e', action="store_const", dest='groupby',
97 101 const='engine',
98 102 help="group outputs by engine (same as group-outputs=engine)"
99 103 ),
100 104 magic_arguments.argument('--group-outputs', dest='groupby', type=str,
101 105 choices=['engine', 'order', 'type'], default='type',
102 106 help="""Group the outputs in a particular way.
103 107
104 108 Choices are:
105 109
106 110 type: group outputs of all engines by type (stdout, stderr, displaypub, etc.).
107 111
108 112 engine: display all output for each engine together.
109 113
110 114 order: like type, but individual displaypub output from each engine is collated.
111 115 For example, if multiple plots are generated by each engine, the first
112 116 figure of each engine will be displayed, then the second of each, etc.
113 117 """
114 118 ),
115 119 magic_arguments.argument('-o', '--out', dest='save_name', type=str,
116 120 help="""store the AsyncResult object for this computation
117 121 in the global namespace under this name.
118 122 """
119 123 ),
120 124 ]
121 125 for a in args:
122 126 f = a(f)
123 127 return f
124 128
125 129 class ParallelMagics(Magics):
126 130 """A set of magics useful when controlling a parallel IPython cluster.
127 131 """
128 132
129 133 # magic-related
130 134 magics = None
131 135 registered = True
132 136
133 137 # suffix for magics
134 138 suffix = ''
135 139 # A flag showing if autopx is activated or not
136 140 _autopx = False
137 141 # the current view used by the magics:
138 142 view = None
139 143 # last result cache for %pxresult
140 144 last_result = None
141 145 # verbose flag
142 146 verbose = False
143 147
144 148 def __init__(self, shell, view, suffix=''):
145 149 self.view = view
146 150 self.suffix = suffix
147 151
148 152 # register magics
149 153 self.magics = dict(cell={},line={})
150 154 line_magics = self.magics['line']
151 155
152 156 px = 'px' + suffix
153 157 if not suffix:
154 158 # keep %result for legacy compatibility
155 159 line_magics['result'] = self.result
156 160
157 161 line_magics['pxresult' + suffix] = self.result
158 162 line_magics[px] = self.px
159 163 line_magics['pxconfig' + suffix] = self.pxconfig
160 164 line_magics['auto' + px] = self.autopx
161 165
162 166 self.magics['cell'][px] = self.cell_px
163 167
164 168 super(ParallelMagics, self).__init__(shell=shell)
165 169
166 170 def _eval_target_str(self, ts):
167 171 if ':' in ts:
168 172 targets = eval("self.view.client.ids[%s]" % ts)
169 173 elif 'all' in ts:
170 174 targets = 'all'
171 175 else:
172 176 targets = eval(ts)
173 177 return targets
174 178
175 179 @magic_arguments.magic_arguments()
176 180 @exec_args
177 181 def pxconfig(self, line):
178 182 """configure default targets/blocking for %px magics"""
179 183 args = magic_arguments.parse_argstring(self.pxconfig, line)
180 184 if args.targets:
181 185 self.view.targets = self._eval_target_str(args.targets)
182 186 if args.block is not None:
183 187 self.view.block = args.block
184 188 if args.set_verbose is not None:
185 189 self.verbose = args.set_verbose
186 190
187 191 @magic_arguments.magic_arguments()
188 192 @output_args
189 193 @skip_doctest
190 194 def result(self, line=''):
191 195 """Print the result of the last asynchronous %px command.
192 196
193 197 This lets you recall the results of %px computations after
194 198 asynchronous submission (block=False).
195 199
196 200 Examples
197 201 --------
198 202 ::
199 203
200 204 In [23]: %px os.getpid()
201 205 Async parallel execution on engine(s): all
202 206
203 207 In [24]: %pxresult
204 208 Out[8:10]: 60920
205 209 Out[9:10]: 60921
206 210 Out[10:10]: 60922
207 211 Out[11:10]: 60923
208 212 """
209 213 args = magic_arguments.parse_argstring(self.result, line)
210 214
211 215 if self.last_result is None:
212 216 raise UsageError(NO_LAST_RESULT)
213 217
214 218 self.last_result.get()
215 219 self.last_result.display_outputs(groupby=args.groupby)
216 220
217 221 @skip_doctest
218 222 def px(self, line=''):
219 223 """Executes the given python command in parallel.
220 224
221 225 Examples
222 226 --------
223 227 ::
224 228
225 229 In [24]: %px a = os.getpid()
226 230 Parallel execution on engine(s): all
227 231
228 232 In [25]: %px print a
229 233 [stdout:0] 1234
230 234 [stdout:1] 1235
231 235 [stdout:2] 1236
232 236 [stdout:3] 1237
233 237 """
234 238 return self.parallel_execute(line)
235 239
236 240 def parallel_execute(self, cell, block=None, groupby='type', save_name=None):
237 241 """implementation used by %px and %%parallel"""
238 242
239 243 # defaults:
240 244 block = self.view.block if block is None else block
241 245
242 246 base = "Parallel" if block else "Async parallel"
243 247
244 248 targets = self.view.targets
245 249 if isinstance(targets, list) and len(targets) > 10:
246 250 str_targets = str(targets[:4])[:-1] + ', ..., ' + str(targets[-4:])[1:]
247 251 else:
248 252 str_targets = str(targets)
249 253 if self.verbose:
250 254 print base + " execution on engine(s): %s" % str_targets
251 255
252 256 result = self.view.execute(cell, silent=False, block=False)
253 257 self.last_result = result
254 258
255 259 if save_name:
256 260 self.shell.user_ns[save_name] = result
257 261
258 262 if block:
259 263 result.get()
260 264 result.display_outputs(groupby)
261 265 else:
262 266 # return AsyncResult only on non-blocking submission
263 267 return result
264 268
265 269 @magic_arguments.magic_arguments()
266 270 @exec_args
267 271 @output_args
268 272 @skip_doctest
269 273 def cell_px(self, line='', cell=None):
270 274 """Executes the cell in parallel.
271 275
272 276 Examples
273 277 --------
274 278 ::
275 279
276 280 In [24]: %%px --noblock
277 281 ....: a = os.getpid()
278 282 Async parallel execution on engine(s): all
279 283
280 284 In [25]: %%px
281 285 ....: print a
282 286 [stdout:0] 1234
283 287 [stdout:1] 1235
284 288 [stdout:2] 1236
285 289 [stdout:3] 1237
286 290 """
287 291
288 292 args = magic_arguments.parse_argstring(self.cell_px, line)
289 293
290 294 if args.targets:
291 295 save_targets = self.view.targets
292 296 self.view.targets = self._eval_target_str(args.targets)
297 if args.local:
298 self.shell.run_cell(cell)
293 299 try:
294 300 return self.parallel_execute(cell, block=args.block,
295 301 groupby=args.groupby,
296 302 save_name=args.save_name,
297 303 )
298 304 finally:
299 305 if args.targets:
300 306 self.view.targets = save_targets
301 307
302 308 @skip_doctest
303 309 def autopx(self, line=''):
304 310 """Toggles auto parallel mode.
305 311
306 312 Once this is called, all commands typed at the command line are send to
307 313 the engines to be executed in parallel. To control which engine are
308 314 used, the ``targets`` attribute of the view before
309 315 entering ``%autopx`` mode.
310 316
311 317
312 318 Then you can do the following::
313 319
314 320 In [25]: %autopx
315 321 %autopx to enabled
316 322
317 323 In [26]: a = 10
318 324 Parallel execution on engine(s): [0,1,2,3]
319 325 In [27]: print a
320 326 Parallel execution on engine(s): [0,1,2,3]
321 327 [stdout:0] 10
322 328 [stdout:1] 10
323 329 [stdout:2] 10
324 330 [stdout:3] 10
325 331
326 332
327 333 In [27]: %autopx
328 334 %autopx disabled
329 335 """
330 336 if self._autopx:
331 337 self._disable_autopx()
332 338 else:
333 339 self._enable_autopx()
334 340
335 341 def _enable_autopx(self):
336 342 """Enable %autopx mode by saving the original run_cell and installing
337 343 pxrun_cell.
338 344 """
339 345 # override run_cell
340 346 self._original_run_cell = self.shell.run_cell
341 347 self.shell.run_cell = self.pxrun_cell
342 348
343 349 self._autopx = True
344 350 print "%autopx enabled"
345 351
346 352 def _disable_autopx(self):
347 353 """Disable %autopx by restoring the original InteractiveShell.run_cell.
348 354 """
349 355 if self._autopx:
350 356 self.shell.run_cell = self._original_run_cell
351 357 self._autopx = False
352 358 print "%autopx disabled"
353 359
354 360 def pxrun_cell(self, raw_cell, store_history=False, silent=False):
355 361 """drop-in replacement for InteractiveShell.run_cell.
356 362
357 363 This executes code remotely, instead of in the local namespace.
358 364
359 365 See InteractiveShell.run_cell for details.
360 366 """
361 367
362 368 if (not raw_cell) or raw_cell.isspace():
363 369 return
364 370
365 371 ipself = self.shell
366 372
367 373 with ipself.builtin_trap:
368 374 cell = ipself.prefilter_manager.prefilter_lines(raw_cell)
369 375
370 376 # Store raw and processed history
371 377 if store_history:
372 378 ipself.history_manager.store_inputs(ipself.execution_count,
373 379 cell, raw_cell)
374 380
375 381 # ipself.logger.log(cell, raw_cell)
376 382
377 383 cell_name = ipself.compile.cache(cell, ipself.execution_count)
378 384
379 385 try:
380 386 ast.parse(cell, filename=cell_name)
381 387 except (OverflowError, SyntaxError, ValueError, TypeError,
382 388 MemoryError):
383 389 # Case 1
384 390 ipself.showsyntaxerror()
385 391 ipself.execution_count += 1
386 392 return None
387 393 except NameError:
388 394 # ignore name errors, because we don't know the remote keys
389 395 pass
390 396
391 397 if store_history:
392 398 # Write output to the database. Does nothing unless
393 399 # history output logging is enabled.
394 400 ipself.history_manager.store_output(ipself.execution_count)
395 401 # Each cell is a *single* input, regardless of how many lines it has
396 402 ipself.execution_count += 1
397 403 if re.search(r'get_ipython\(\)\.magic\(u?["\']%?autopx', cell):
398 404 self._disable_autopx()
399 405 return False
400 406 else:
401 407 try:
402 408 result = self.view.execute(cell, silent=False, block=False)
403 409 except:
404 410 ipself.showtraceback()
405 411 return True
406 412 else:
407 413 if self.view.block:
408 414 try:
409 415 result.get()
410 416 except:
411 417 self.shell.showtraceback()
412 418 return True
413 419 else:
414 420 with ipself.builtin_trap:
415 421 result.display_outputs()
416 422 return False
417 423
418 424
419 425 __doc__ = __doc__.format(
420 426 AUTOPX_DOC = ' '*8 + ParallelMagics.autopx.__doc__,
421 427 PX_DOC = ' '*8 + ParallelMagics.px.__doc__,
422 428 RESULT_DOC = ' '*8 + ParallelMagics.result.__doc__,
423 429 CONFIG_DOC = ' '*8 + ParallelMagics.pxconfig.__doc__,
424 430 )
General Comments 0
You need to be logged in to leave comments. Login now