magics.py
424 lines
| 12.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r7473 | # encoding: utf-8 | ||
""" | ||||
============= | ||||
parallelmagic | ||||
============= | ||||
Magic command interface for interactive parallel work. | ||||
Usage | ||||
===== | ||||
``%autopx`` | ||||
{AUTOPX_DOC} | ||||
``%px`` | ||||
{PX_DOC} | ||||
MinRK
|
r7476 | ``%pxresult`` | ||
MinRK
|
r7473 | |||
{RESULT_DOC} | ||||
MinRK
|
r7476 | ``%pxconfig`` | ||
{CONFIG_DOC} | ||||
MinRK
|
r7473 | """ | ||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2008 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import ast | ||||
import re | ||||
from IPython.core.error import UsageError | ||||
MinRK
|
r7476 | from IPython.core.magic import Magics | ||
from IPython.core import magic_arguments | ||||
MinRK
|
r7473 | from IPython.testing.skipdoctest import skip_doctest | ||
#----------------------------------------------------------------------------- | ||||
# Definitions of magic functions for use with IPython | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r7476 | NO_LAST_RESULT = "%pxresult recalls last %px result, which has not yet been used." | ||
MinRK
|
r7473 | |||
MinRK
|
r7476 | def exec_args(f): | ||
"""decorator for adding block/targets args for execution | ||||
applied to %pxconfig and %%px | ||||
""" | ||||
args = [ | ||||
magic_arguments.argument('-b', '--block', action="store_const", | ||||
const=True, dest='block', | ||||
MinRK
|
r7501 | help="use blocking (sync) execution", | ||
MinRK
|
r7476 | ), | ||
magic_arguments.argument('-a', '--noblock', action="store_const", | ||||
const=False, dest='block', | ||||
MinRK
|
r7501 | help="use non-blocking (async) execution", | ||
MinRK
|
r7476 | ), | ||
magic_arguments.argument('-t', '--targets', type=str, | ||||
MinRK
|
r7501 | help="specify the targets on which to execute", | ||
), | ||||
magic_arguments.argument('--verbose', action="store_const", | ||||
const=True, dest="set_verbose", | ||||
help="print a message at each execution", | ||||
), | ||||
magic_arguments.argument('--no-verbose', action="store_const", | ||||
const=False, dest="set_verbose", | ||||
help="don't print any messages", | ||||
MinRK
|
r7476 | ), | ||
] | ||||
for a in args: | ||||
f = a(f) | ||||
return f | ||||
def output_args(f): | ||||
"""decorator for output-formatting args | ||||
applied to %pxresult and %%px | ||||
""" | ||||
args = [ | ||||
magic_arguments.argument('-r', action="store_const", dest='groupby', | ||||
const='order', | ||||
help="collate outputs in order (same as group-outputs=order)" | ||||
), | ||||
magic_arguments.argument('-e', action="store_const", dest='groupby', | ||||
const='engine', | ||||
help="group outputs by engine (same as group-outputs=engine)" | ||||
), | ||||
magic_arguments.argument('--group-outputs', dest='groupby', type=str, | ||||
choices=['engine', 'order', 'type'], default='type', | ||||
help="""Group the outputs in a particular way. | ||||
Choices are: | ||||
type: group outputs of all engines by type (stdout, stderr, displaypub, etc.). | ||||
engine: display all output for each engine together. | ||||
order: like type, but individual displaypub output from each engine is collated. | ||||
For example, if multiple plots are generated by each engine, the first | ||||
figure of each engine will be displayed, then the second of each, etc. | ||||
""" | ||||
), | ||||
magic_arguments.argument('-o', '--out', dest='save_name', type=str, | ||||
help="""store the AsyncResult object for this computation | ||||
in the global namespace under this name. | ||||
""" | ||||
), | ||||
] | ||||
for a in args: | ||||
f = a(f) | ||||
return f | ||||
MinRK
|
r7473 | |||
class ParallelMagics(Magics): | ||||
"""A set of magics useful when controlling a parallel IPython cluster. | ||||
""" | ||||
MinRK
|
r7476 | # magic-related | ||
magics = None | ||||
registered = True | ||||
# suffix for magics | ||||
suffix = '' | ||||
MinRK
|
r7473 | # A flag showing if autopx is activated or not | ||
_autopx = False | ||||
# the current view used by the magics: | ||||
MinRK
|
r7476 | view = None | ||
# last result cache for %pxresult | ||||
MinRK
|
r7473 | last_result = None | ||
MinRK
|
r7501 | # verbose flag | ||
verbose = False | ||||
MinRK
|
r7473 | |||
MinRK
|
r7476 | def __init__(self, shell, view, suffix=''): | ||
self.view = view | ||||
self.suffix = suffix | ||||
MinRK
|
r7473 | |||
MinRK
|
r7476 | # register magics | ||
self.magics = dict(cell={},line={}) | ||||
line_magics = self.magics['line'] | ||||
MinRK
|
r7473 | |||
MinRK
|
r7476 | px = 'px' + suffix | ||
if not suffix: | ||||
# keep %result for legacy compatibility | ||||
line_magics['result'] = self.result | ||||
MinRK
|
r7473 | |||
MinRK
|
r7476 | line_magics['pxresult' + suffix] = self.result | ||
line_magics[px] = self.px | ||||
line_magics['pxconfig' + suffix] = self.pxconfig | ||||
line_magics['auto' + px] = self.autopx | ||||
MinRK
|
r7473 | |||
MinRK
|
r7476 | self.magics['cell'][px] = self.cell_px | ||
MinRK
|
r7473 | |||
MinRK
|
r7476 | super(ParallelMagics, self).__init__(shell=shell) | ||
def _eval_target_str(self, ts): | ||||
if ':' in ts: | ||||
targets = eval("self.view.client.ids[%s]" % ts) | ||||
elif 'all' in ts: | ||||
targets = 'all' | ||||
else: | ||||
targets = eval(ts) | ||||
return targets | ||||
@magic_arguments.magic_arguments() | ||||
@exec_args | ||||
def pxconfig(self, line): | ||||
"""configure default targets/blocking for %px magics""" | ||||
args = magic_arguments.parse_argstring(self.pxconfig, line) | ||||
if args.targets: | ||||
self.view.targets = self._eval_target_str(args.targets) | ||||
if args.block is not None: | ||||
self.view.block = args.block | ||||
MinRK
|
r7501 | if args.set_verbose is not None: | ||
self.verbose = args.set_verbose | ||||
MinRK
|
r7476 | |||
@magic_arguments.magic_arguments() | ||||
@output_args | ||||
@skip_doctest | ||||
def result(self, line=''): | ||||
"""Print the result of the last asynchronous %px command. | ||||
MinRK
|
r7473 | |||
This lets you recall the results of %px computations after | ||||
MinRK
|
r7476 | asynchronous submission (block=False). | ||
MinRK
|
r7473 | |||
MinRK
|
r7476 | Examples | ||
-------- | ||||
:: | ||||
MinRK
|
r7473 | |||
In [23]: %px os.getpid() | ||||
Async parallel execution on engine(s): all | ||||
MinRK
|
r7476 | In [24]: %pxresult | ||
MinRK
|
r7495 | Out[8:10]: 60920 | ||
Out[9:10]: 60921 | ||||
Out[10:10]: 60922 | ||||
Out[11:10]: 60923 | ||||
MinRK
|
r7473 | """ | ||
MinRK
|
r7476 | args = magic_arguments.parse_argstring(self.result, line) | ||
MinRK
|
r7473 | |||
if self.last_result is None: | ||||
raise UsageError(NO_LAST_RESULT) | ||||
self.last_result.get() | ||||
MinRK
|
r7476 | self.last_result.display_outputs(groupby=args.groupby) | ||
MinRK
|
r7473 | |||
@skip_doctest | ||||
MinRK
|
r7476 | def px(self, line=''): | ||
MinRK
|
r7473 | """Executes the given python command in parallel. | ||
MinRK
|
r7476 | Examples | ||
-------- | ||||
:: | ||||
MinRK
|
r7473 | |||
In [24]: %px a = os.getpid() | ||||
Parallel execution on engine(s): all | ||||
In [25]: %px print a | ||||
[stdout:0] 1234 | ||||
[stdout:1] 1235 | ||||
[stdout:2] 1236 | ||||
[stdout:3] 1237 | ||||
""" | ||||
MinRK
|
r7476 | return self.parallel_execute(line) | ||
MinRK
|
r7473 | |||
def parallel_execute(self, cell, block=None, groupby='type', save_name=None): | ||||
"""implementation used by %px and %%parallel""" | ||||
# defaults: | ||||
MinRK
|
r7476 | block = self.view.block if block is None else block | ||
MinRK
|
r7473 | |||
base = "Parallel" if block else "Async parallel" | ||||
MinRK
|
r7476 | targets = self.view.targets | ||
MinRK
|
r7473 | if isinstance(targets, list) and len(targets) > 10: | ||
str_targets = str(targets[:4])[:-1] + ', ..., ' + str(targets[-4:])[1:] | ||||
else: | ||||
str_targets = str(targets) | ||||
MinRK
|
r7501 | if self.verbose: | ||
print base + " execution on engine(s): %s" % str_targets | ||||
MinRK
|
r7473 | |||
MinRK
|
r7476 | result = self.view.execute(cell, silent=False, block=False) | ||
MinRK
|
r7473 | self.last_result = result | ||
if save_name: | ||||
self.shell.user_ns[save_name] = result | ||||
if block: | ||||
result.get() | ||||
result.display_outputs(groupby) | ||||
else: | ||||
# return AsyncResult only on non-blocking submission | ||||
return result | ||||
MinRK
|
r7476 | @magic_arguments.magic_arguments() | ||
@exec_args | ||||
@output_args | ||||
MinRK
|
r7473 | @skip_doctest | ||
def cell_px(self, line='', cell=None): | ||||
MinRK
|
r7476 | """Executes the cell in parallel. | ||
MinRK
|
r7473 | |||
MinRK
|
r7476 | Examples | ||
-------- | ||||
:: | ||||
MinRK
|
r7473 | |||
In [24]: %%px --noblock | ||||
....: a = os.getpid() | ||||
Async parallel execution on engine(s): all | ||||
In [25]: %%px | ||||
....: print a | ||||
[stdout:0] 1234 | ||||
[stdout:1] 1235 | ||||
[stdout:2] 1236 | ||||
[stdout:3] 1237 | ||||
""" | ||||
MinRK
|
r7476 | args = magic_arguments.parse_argstring(self.cell_px, line) | ||
if args.targets: | ||||
save_targets = self.view.targets | ||||
self.view.targets = self._eval_target_str(args.targets) | ||||
try: | ||||
return self.parallel_execute(cell, block=args.block, | ||||
groupby=args.groupby, | ||||
save_name=args.save_name, | ||||
) | ||||
finally: | ||||
if args.targets: | ||||
self.view.targets = save_targets | ||||
MinRK
|
r7473 | @skip_doctest | ||
MinRK
|
r7476 | def autopx(self, line=''): | ||
MinRK
|
r7473 | """Toggles auto parallel mode. | ||
MinRK
|
r7476 | Once this is called, all commands typed at the command line are send to | ||
the engines to be executed in parallel. To control which engine are | ||||
used, the ``targets`` attribute of the view before | ||||
entering ``%autopx`` mode. | ||||
MinRK
|
r7473 | |||
Then you can do the following:: | ||||
In [25]: %autopx | ||||
%autopx to enabled | ||||
In [26]: a = 10 | ||||
Parallel execution on engine(s): [0,1,2,3] | ||||
In [27]: print a | ||||
Parallel execution on engine(s): [0,1,2,3] | ||||
[stdout:0] 10 | ||||
[stdout:1] 10 | ||||
[stdout:2] 10 | ||||
[stdout:3] 10 | ||||
In [27]: %autopx | ||||
%autopx disabled | ||||
""" | ||||
if self._autopx: | ||||
self._disable_autopx() | ||||
else: | ||||
self._enable_autopx() | ||||
def _enable_autopx(self): | ||||
"""Enable %autopx mode by saving the original run_cell and installing | ||||
pxrun_cell. | ||||
""" | ||||
# override run_cell | ||||
self._original_run_cell = self.shell.run_cell | ||||
self.shell.run_cell = self.pxrun_cell | ||||
self._autopx = True | ||||
print "%autopx enabled" | ||||
def _disable_autopx(self): | ||||
"""Disable %autopx by restoring the original InteractiveShell.run_cell. | ||||
""" | ||||
if self._autopx: | ||||
self.shell.run_cell = self._original_run_cell | ||||
self._autopx = False | ||||
print "%autopx disabled" | ||||
def pxrun_cell(self, raw_cell, store_history=False, silent=False): | ||||
"""drop-in replacement for InteractiveShell.run_cell. | ||||
This executes code remotely, instead of in the local namespace. | ||||
See InteractiveShell.run_cell for details. | ||||
""" | ||||
if (not raw_cell) or raw_cell.isspace(): | ||||
return | ||||
ipself = self.shell | ||||
with ipself.builtin_trap: | ||||
cell = ipself.prefilter_manager.prefilter_lines(raw_cell) | ||||
# Store raw and processed history | ||||
if store_history: | ||||
ipself.history_manager.store_inputs(ipself.execution_count, | ||||
cell, raw_cell) | ||||
# ipself.logger.log(cell, raw_cell) | ||||
cell_name = ipself.compile.cache(cell, ipself.execution_count) | ||||
try: | ||||
ast.parse(cell, filename=cell_name) | ||||
except (OverflowError, SyntaxError, ValueError, TypeError, | ||||
MemoryError): | ||||
# Case 1 | ||||
ipself.showsyntaxerror() | ||||
ipself.execution_count += 1 | ||||
return None | ||||
except NameError: | ||||
# ignore name errors, because we don't know the remote keys | ||||
pass | ||||
if store_history: | ||||
# Write output to the database. Does nothing unless | ||||
# history output logging is enabled. | ||||
ipself.history_manager.store_output(ipself.execution_count) | ||||
# Each cell is a *single* input, regardless of how many lines it has | ||||
ipself.execution_count += 1 | ||||
if re.search(r'get_ipython\(\)\.magic\(u?["\']%?autopx', cell): | ||||
self._disable_autopx() | ||||
return False | ||||
else: | ||||
try: | ||||
MinRK
|
r7476 | result = self.view.execute(cell, silent=False, block=False) | ||
MinRK
|
r7473 | except: | ||
ipself.showtraceback() | ||||
return True | ||||
else: | ||||
MinRK
|
r7476 | if self.view.block: | ||
MinRK
|
r7473 | try: | ||
result.get() | ||||
except: | ||||
self.shell.showtraceback() | ||||
return True | ||||
else: | ||||
with ipself.builtin_trap: | ||||
result.display_outputs() | ||||
return False | ||||
__doc__ = __doc__.format( | ||||
AUTOPX_DOC = ' '*8 + ParallelMagics.autopx.__doc__, | ||||
PX_DOC = ' '*8 + ParallelMagics.px.__doc__, | ||||
MinRK
|
r7476 | RESULT_DOC = ' '*8 + ParallelMagics.result.__doc__, | ||
CONFIG_DOC = ' '*8 + ParallelMagics.pxconfig.__doc__, | ||||
MinRK
|
r7473 | ) | ||