# encoding: utf-8 """ ============= parallelmagic ============= Magic command interface for interactive parallel work. Usage ===== ``%autopx`` {AUTOPX_DOC} ``%px`` {PX_DOC} ``%pxresult`` {RESULT_DOC} ``%pxconfig`` {CONFIG_DOC} """ #----------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- import ast import re from IPython.core.error import UsageError from IPython.core.magic import Magics from IPython.core import magic_arguments from IPython.testing.skipdoctest import skip_doctest #----------------------------------------------------------------------------- # Definitions of magic functions for use with IPython #----------------------------------------------------------------------------- NO_LAST_RESULT = "%pxresult recalls last %px result, which has not yet been used." def exec_args(f): """decorator for adding block/targets args for execution applied to %pxconfig and %%px """ args = [ magic_arguments.argument('-b', '--block', action="store_const", const=True, dest='block', help="use blocking (sync) execution", ), magic_arguments.argument('-a', '--noblock', action="store_const", const=False, dest='block', help="use non-blocking (async) execution", ), magic_arguments.argument('-t', '--targets', type=str, help="specify the targets on which to execute", ), magic_arguments.argument('--local', action="store_const", const=True, dest="local", help="also execute the cell in the local namespace", ), magic_arguments.argument('--verbose', action="store_const", const=True, dest="set_verbose", help="print a message at each execution", ), magic_arguments.argument('--no-verbose', action="store_const", const=False, dest="set_verbose", help="don't print any messages", ), ] for a in args: f = a(f) return f def output_args(f): """decorator for output-formatting args applied to %pxresult and %%px """ args = [ magic_arguments.argument('-r', action="store_const", dest='groupby', const='order', help="collate outputs in order (same as group-outputs=order)" ), magic_arguments.argument('-e', action="store_const", dest='groupby', const='engine', help="group outputs by engine (same as group-outputs=engine)" ), magic_arguments.argument('--group-outputs', dest='groupby', type=str, choices=['engine', 'order', 'type'], default='type', help="""Group the outputs in a particular way. Choices are: type: group outputs of all engines by type (stdout, stderr, displaypub, etc.). engine: display all output for each engine together. order: like type, but individual displaypub output from each engine is collated. For example, if multiple plots are generated by each engine, the first figure of each engine will be displayed, then the second of each, etc. """ ), magic_arguments.argument('-o', '--out', dest='save_name', type=str, help="""store the AsyncResult object for this computation in the global namespace under this name. """ ), ] for a in args: f = a(f) return f class ParallelMagics(Magics): """A set of magics useful when controlling a parallel IPython cluster. """ # magic-related magics = None registered = True # suffix for magics suffix = '' # A flag showing if autopx is activated or not _autopx = False # the current view used by the magics: view = None # last result cache for %pxresult last_result = None # verbose flag verbose = False def __init__(self, shell, view, suffix=''): self.view = view self.suffix = suffix # register magics self.magics = dict(cell={},line={}) line_magics = self.magics['line'] px = 'px' + suffix if not suffix: # keep %result for legacy compatibility line_magics['result'] = self.result line_magics['pxresult' + suffix] = self.result line_magics[px] = self.px line_magics['pxconfig' + suffix] = self.pxconfig line_magics['auto' + px] = self.autopx self.magics['cell'][px] = self.cell_px super(ParallelMagics, self).__init__(shell=shell) def _eval_target_str(self, ts): if ':' in ts: targets = eval("self.view.client.ids[%s]" % ts) elif 'all' in ts: targets = 'all' else: targets = eval(ts) return targets @magic_arguments.magic_arguments() @exec_args def pxconfig(self, line): """configure default targets/blocking for %px magics""" args = magic_arguments.parse_argstring(self.pxconfig, line) if args.targets: self.view.targets = self._eval_target_str(args.targets) if args.block is not None: self.view.block = args.block if args.set_verbose is not None: self.verbose = args.set_verbose @magic_arguments.magic_arguments() @output_args @skip_doctest def result(self, line=''): """Print the result of the last asynchronous %px command. This lets you recall the results of %px computations after asynchronous submission (block=False). Examples -------- :: In [23]: %px os.getpid() Async parallel execution on engine(s): all In [24]: %pxresult Out[8:10]: 60920 Out[9:10]: 60921 Out[10:10]: 60922 Out[11:10]: 60923 """ args = magic_arguments.parse_argstring(self.result, line) if self.last_result is None: raise UsageError(NO_LAST_RESULT) self.last_result.get() self.last_result.display_outputs(groupby=args.groupby) @skip_doctest def px(self, line=''): """Executes the given python command in parallel. Examples -------- :: In [24]: %px a = os.getpid() Parallel execution on engine(s): all In [25]: %px print a [stdout:0] 1234 [stdout:1] 1235 [stdout:2] 1236 [stdout:3] 1237 """ return self.parallel_execute(line) def parallel_execute(self, cell, block=None, groupby='type', save_name=None): """implementation used by %px and %%parallel""" # defaults: block = self.view.block if block is None else block base = "Parallel" if block else "Async parallel" targets = self.view.targets if isinstance(targets, list) and len(targets) > 10: str_targets = str(targets[:4])[:-1] + ', ..., ' + str(targets[-4:])[1:] else: str_targets = str(targets) if self.verbose: print base + " execution on engine(s): %s" % str_targets result = self.view.execute(cell, silent=False, block=False) self.last_result = result if save_name: self.shell.user_ns[save_name] = result if block: result.get() result.display_outputs(groupby) else: # return AsyncResult only on non-blocking submission return result @magic_arguments.magic_arguments() @exec_args @output_args @skip_doctest def cell_px(self, line='', cell=None): """Executes the cell in parallel. Examples -------- :: In [24]: %%px --noblock ....: a = os.getpid() Async parallel execution on engine(s): all In [25]: %%px ....: print a [stdout:0] 1234 [stdout:1] 1235 [stdout:2] 1236 [stdout:3] 1237 """ args = magic_arguments.parse_argstring(self.cell_px, line) if args.targets: save_targets = self.view.targets self.view.targets = self._eval_target_str(args.targets) if args.local: self.shell.run_cell(cell) try: return self.parallel_execute(cell, block=args.block, groupby=args.groupby, save_name=args.save_name, ) finally: if args.targets: self.view.targets = save_targets @skip_doctest def autopx(self, line=''): """Toggles auto parallel mode. Once this is called, all commands typed at the command line are send to the engines to be executed in parallel. To control which engine are used, the ``targets`` attribute of the view before entering ``%autopx`` mode. Then you can do the following:: In [25]: %autopx %autopx to enabled In [26]: a = 10 Parallel execution on engine(s): [0,1,2,3] In [27]: print a Parallel execution on engine(s): [0,1,2,3] [stdout:0] 10 [stdout:1] 10 [stdout:2] 10 [stdout:3] 10 In [27]: %autopx %autopx disabled """ if self._autopx: self._disable_autopx() else: self._enable_autopx() def _enable_autopx(self): """Enable %autopx mode by saving the original run_cell and installing pxrun_cell. """ # override run_cell self._original_run_cell = self.shell.run_cell self.shell.run_cell = self.pxrun_cell self._autopx = True print "%autopx enabled" def _disable_autopx(self): """Disable %autopx by restoring the original InteractiveShell.run_cell. """ if self._autopx: self.shell.run_cell = self._original_run_cell self._autopx = False print "%autopx disabled" def pxrun_cell(self, raw_cell, store_history=False, silent=False): """drop-in replacement for InteractiveShell.run_cell. This executes code remotely, instead of in the local namespace. See InteractiveShell.run_cell for details. """ if (not raw_cell) or raw_cell.isspace(): return ipself = self.shell with ipself.builtin_trap: cell = ipself.prefilter_manager.prefilter_lines(raw_cell) # Store raw and processed history if store_history: ipself.history_manager.store_inputs(ipself.execution_count, cell, raw_cell) # ipself.logger.log(cell, raw_cell) cell_name = ipself.compile.cache(cell, ipself.execution_count) try: ast.parse(cell, filename=cell_name) except (OverflowError, SyntaxError, ValueError, TypeError, MemoryError): # Case 1 ipself.showsyntaxerror() ipself.execution_count += 1 return None except NameError: # ignore name errors, because we don't know the remote keys pass if store_history: # Write output to the database. Does nothing unless # history output logging is enabled. ipself.history_manager.store_output(ipself.execution_count) # Each cell is a *single* input, regardless of how many lines it has ipself.execution_count += 1 if re.search(r'get_ipython\(\)\.magic\(u?["\']%?autopx', cell): self._disable_autopx() return False else: try: result = self.view.execute(cell, silent=False, block=False) except: ipself.showtraceback() return True else: if self.view.block: try: result.get() except: self.shell.showtraceback() return True else: with ipself.builtin_trap: result.display_outputs() return False __doc__ = __doc__.format( AUTOPX_DOC = ' '*8 + ParallelMagics.autopx.__doc__, PX_DOC = ' '*8 + ParallelMagics.px.__doc__, RESULT_DOC = ' '*8 + ParallelMagics.result.__doc__, CONFIG_DOC = ' '*8 + ParallelMagics.pxconfig.__doc__, )