From a136082848a34849a6b831a0c19acb9cc75d462c 2012-05-29 06:50:40 From: Fernando Perez Date: 2012-05-29 06:50:40 Subject: [PATCH] Merge pull request #1768 from minrk/parallelmagics Update parallel magics They now display all output, so you can do parallel plotting or other actions with complex display. The `px` magic has now both line and cell modes, and in cell mode finer control has been added about how to collate output from multiple engines. Tests, docs and example notebook added. --- diff --git a/IPython/core/magic.py b/IPython/core/magic.py index 6d1c9d1..4f39a53 100644 --- a/IPython/core/magic.py +++ b/IPython/core/magic.py @@ -542,7 +542,7 @@ class Magics(object): argv = arg_split(arg_str, posix, strict) # Do regular option processing try: - opts,args = getopt(argv,opt_str,*long_opts) + opts,args = getopt(argv, opt_str, long_opts) except GetoptError,e: raise UsageError('%s ( allowed: "%s" %s)' % (e.msg,opt_str, " ".join(long_opts))) diff --git a/IPython/core/tests/test_magic.py b/IPython/core/tests/test_magic.py index 296d895..b6572dc 100644 --- a/IPython/core/tests/test_magic.py +++ b/IPython/core/tests/test_magic.py @@ -69,6 +69,15 @@ def test_magic_parse_options(): expected = path nt.assert_equals(opts['f'], expected) +def test_magic_parse_long_options(): + """Magic.parse_options can handle --foo=bar long options""" + ip = get_ipython() + m = DummyMagics(ip) + opts, _ = m.parse_options('--foo --bar=bubble', 'a', 'foo', 'bar=') + nt.assert_true('foo' in opts) + nt.assert_true('bar' in opts) + nt.assert_true(opts['bar'], "bubble") + @dec.skip_without('sqlite3') def doctest_hist_f(): diff --git a/IPython/extensions/parallelmagic.py b/IPython/extensions/parallelmagic.py index abb49fb..80c30e2 100644 --- a/IPython/extensions/parallelmagic.py +++ b/IPython/extensions/parallelmagic.py @@ -11,15 +11,15 @@ Usage ``%autopx`` -@AUTOPX_DOC@ +{AUTOPX_DOC} ``%px`` -@PX_DOC@ +{PX_DOC} ``%result`` -@RESULT_DOC@ +{RESULT_DOC} """ @@ -37,87 +37,172 @@ Usage import ast import re -from IPython.core.magic import Magics, magics_class, line_magic +from IPython.core.error import UsageError +from IPython.core.magic import Magics, magics_class, line_magic, cell_magic from IPython.testing.skipdoctest import skip_doctest #----------------------------------------------------------------------------- # Definitions of magic functions for use with IPython #----------------------------------------------------------------------------- -NO_ACTIVE_VIEW = """ -Use activate() on a DirectView object to activate it for magics. -""" + +NO_ACTIVE_VIEW = "Use activate() on a DirectView object to use it with magics." @magics_class class ParallelMagics(Magics): """A set of magics useful when controlling a parallel IPython cluster. """ - - def __init__(self, shell): - super(ParallelMagics, self).__init__(shell) - # A flag showing if autopx is activated or not - self.autopx = False + + # A flag showing if autopx is activated or not + _autopx = False + # the current view used by the magics: + active_view = None @skip_doctest @line_magic def result(self, parameter_s=''): - """Print the result of command i on all engines.. + """Print the result of command i on all engines. To use this a :class:`DirectView` instance must be created and then activated by calling its :meth:`activate` method. + + This lets you recall the results of %px computations after + asynchronous submission (view.block=False). Then you can do the following:: - In [23]: %result - Out[23]: - - [0] In [6]: a = 10 - [1] In [6]: a = 10 - - In [22]: %result 6 - Out[22]: - - [0] In [6]: a = 10 - [1] In [6]: a = 10 + In [23]: %px os.getpid() + Async parallel execution on engine(s): all + + In [24]: %result + [ 8] Out[10]: 60920 + [ 9] Out[10]: 60921 + [10] Out[10]: 60922 + [11] Out[10]: 60923 """ + if self.active_view is None: - print NO_ACTIVE_VIEW - return - + raise UsageError(NO_ACTIVE_VIEW) + + stride = len(self.active_view) try: index = int(parameter_s) except: - index = None - result = self.active_view.get_result(index) - return result + index = -1 + msg_ids = self.active_view.history[stride * index:(stride * (index + 1)) or None] + + result = self.active_view.get_result(msg_ids) + + result.get() + result.display_outputs() @skip_doctest @line_magic def px(self, parameter_s=''): """Executes the given python command in parallel. - + To use this a :class:`DirectView` instance must be created and then activated by calling its :meth:`activate` method. Then you can do the following:: - In [24]: %px a = 5 + In [24]: %px a = os.getpid() Parallel execution on engine(s): all - Out[24]: - - [0] In [7]: a = 5 - [1] In [7]: a = 5 + + In [25]: %px print a + [stdout:0] 1234 + [stdout:1] 1235 + [stdout:2] 1236 + [stdout:3] 1237 """ + return self.parallel_execute(parameter_s) + + def parallel_execute(self, cell, block=None, groupby='type'): + """implementation used by %px and %%parallel""" if self.active_view is None: - print NO_ACTIVE_VIEW - return - print "Parallel execution on engine(s): %s" % self.active_view.targets - result = self.active_view.execute(parameter_s, block=False) - if self.active_view.block: + raise UsageError(NO_ACTIVE_VIEW) + + # defaults: + block = self.active_view.block if block is None else block + + base = "Parallel" if block else "Async parallel" + print base + " execution on engine(s): %s" % self.active_view.targets + + result = self.active_view.execute(cell, silent=False, block=False) + if block: result.get() - self._maybe_display_output(result) + result.display_outputs(groupby) + else: + # return AsyncResult only on non-blocking submission + return result + + @skip_doctest + @cell_magic('px') + def cell_px(self, line='', cell=None): + """Executes the given python command in parallel. + + Cell magic usage: + + %%px [-o] [-e] [--group-options=type|engine|order] [--[no]block] + + Options (%%px cell magic only): + + -o: collate outputs in oder (same as group-outputs=order) + + -e: group outputs by engine (same as group-outputs=engine) + + --group-outputs=type [default behavior]: + each output type (stdout, stderr, displaypub) for all engines + displayed together. + + --group-outputs=order: + The same as 'type', but individual displaypub outputs (e.g. plots) + will be interleaved, so it will display all of the first plots, + then all of the second plots, etc. + + --group-outputs=engine: + All of an engine's output is displayed before moving on to the next. + + --[no]block: + Whether or not to block for the execution to complete + (and display the results). If unspecified, the active view's + + + To use this a :class:`DirectView` instance must be created + and then activated by calling its :meth:`activate` method. + + Then you can do the following:: + + In [24]: %%parallel --noblock a = os.getpid() + Async parallel execution on engine(s): all + + In [25]: %px print a + [stdout:0] 1234 + [stdout:1] 1235 + [stdout:2] 1236 + [stdout:3] 1237 + """ + + block = None + groupby = 'type' + # as a cell magic, we accept args + opts, _ = self.parse_options(line, 'oe', 'group-outputs=', 'block', 'noblock') + + if 'group-outputs' in opts: + groupby = opts['group-outputs'] + elif 'o' in opts: + groupby = 'order' + elif 'e' in opts: + groupby = 'engine' + + if 'block' in opts: + block = True + elif 'noblock' in opts: + block = False + + return self.parallel_execute(cell, block=block, groupby=groupby) @skip_doctest @line_magic @@ -149,7 +234,7 @@ class ParallelMagics(Magics): In [27]: %autopx %autopx disabled """ - if self.autopx: + if self._autopx: self._disable_autopx() else: self._enable_autopx() @@ -159,50 +244,23 @@ class ParallelMagics(Magics): pxrun_cell. """ if self.active_view is None: - print NO_ACTIVE_VIEW - return + raise UsageError(NO_ACTIVE_VIEW) - # override run_cell and run_code + # override run_cell self._original_run_cell = self.shell.run_cell self.shell.run_cell = self.pxrun_cell - self._original_run_code = self.shell.run_code - self.shell.run_code = self.pxrun_code - self.autopx = True + self._autopx = True print "%autopx enabled" def _disable_autopx(self): """Disable %autopx by restoring the original InteractiveShell.run_cell. """ - if self.autopx: + if self._autopx: self.shell.run_cell = self._original_run_cell - self.shell.run_code = self._original_run_code - self.autopx = False + self._autopx = False print "%autopx disabled" - def _maybe_display_output(self, result): - """Maybe display the output of a parallel result. - - If self.active_view.block is True, wait for the result - and display the result. Otherwise, this is a noop. - """ - if isinstance(result.stdout, basestring): - # single result - stdouts = [result.stdout.rstrip()] - else: - stdouts = [s.rstrip() for s in result.stdout] - - targets = self.active_view.targets - if isinstance(targets, int): - targets = [targets] - elif targets == 'all': - targets = self.active_view.client.ids - - if any(stdouts): - for eid,stdout in zip(targets, stdouts): - print '[stdout:%i]'%eid, stdout - - def pxrun_cell(self, raw_cell, store_history=False, silent=False): """drop-in replacement for InteractiveShell.run_cell. @@ -263,47 +321,16 @@ class ParallelMagics(Magics): self.shell.showtraceback() return True else: - self._maybe_display_output(result) - return False - - def pxrun_code(self, code_obj): - """drop-in replacement for InteractiveShell.run_code. - - This executes code remotely, instead of in the local namespace. - - See InteractiveShell.run_code for details. - """ - ipself = self.shell - # check code object for the autopx magic - if 'get_ipython' in code_obj.co_names and 'magic' in code_obj.co_names \ - and any( [ isinstance(c, basestring) and 'autopx' in c - for c in code_obj.co_consts ]): - self._disable_autopx() - return False - else: - try: - result = self.active_view.execute(code_obj, block=False) - except: - ipself.showtraceback() - return True - else: - if self.active_view.block: - try: - result.get() - except: - self.shell.showtraceback() - return True - else: - self._maybe_display_output(result) + with ipself.builtin_trap: + result.display_outputs() return False -__doc__ = __doc__.replace('@AUTOPX_DOC@', - " " + ParallelMagics.autopx.__doc__) -__doc__ = __doc__.replace('@PX_DOC@', - " " + ParallelMagics.px.__doc__) -__doc__ = __doc__.replace('@RESULT_DOC@', - " " + ParallelMagics.result.__doc__) +__doc__ = __doc__.format( + AUTOPX_DOC = ' '*8 + ParallelMagics.autopx.__doc__, + PX_DOC = ' '*8 + ParallelMagics.px.__doc__, + RESULT_DOC = ' '*8 + ParallelMagics.result.__doc__ +) _loaded = False diff --git a/IPython/parallel/client/asyncresult.py b/IPython/parallel/client/asyncresult.py index 5931b6f..1bae6f6 100644 --- a/IPython/parallel/client/asyncresult.py +++ b/IPython/parallel/client/asyncresult.py @@ -21,7 +21,7 @@ from datetime import datetime from zmq import MessageTracker -from IPython.core.display import clear_output +from IPython.core.display import clear_output, display from IPython.external.decorator import decorator from IPython.parallel import error @@ -377,6 +377,140 @@ class AsyncResult(object): sys.stdout.flush() print print "done" + + def _republish_displaypub(self, content, eid): + """republish individual displaypub content dicts""" + try: + ip = get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + md = content['metadata'] or {} + md['engine'] = eid + ip.display_pub.publish(content['source'], content['data'], md) + + + def _display_single_result(self): + + print self.stdout + print >> sys.stderr, self.stderr + + try: + get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + + for output in self.outputs: + self._republish_displaypub(output, self.engine_id) + + if self.pyout is not None: + display(self.get()) + + @check_ready + def display_outputs(self, groupby="type"): + """republish the outputs of the computation + + Parameters + ---------- + + groupby : str [default: type] + if 'type': + Group outputs by type (show all stdout, then all stderr, etc.): + + [stdout:1] foo + [stdout:2] foo + [stderr:1] bar + [stderr:2] bar + if 'engine': + Display outputs for each engine before moving on to the next: + + [stdout:1] foo + [stderr:1] bar + [stdout:2] foo + [stderr:2] bar + + if 'order': + Like 'type', but further collate individual displaypub + outputs. This is meant for cases of each command producing + several plots, and you would like to see all of the first + plots together, then all of the second plots, and so on. + """ + # flush iopub, just in case + self._client._flush_iopub(self._client._iopub_socket) + if self._single_result: + self._display_single_result() + return + + stdouts = [s.rstrip() for s in self.stdout] + stderrs = [s.rstrip() for s in self.stderr] + pyouts = [p for p in self.pyout] + output_lists = self.outputs + results = self.get() + + targets = self.engine_id + + if groupby == "engine": + for eid,stdout,stderr,outputs,r,pyout in zip( + targets, stdouts, stderrs, output_lists, results, pyouts + ): + if stdout: + print '[stdout:%i]' % eid, stdout + if stderr: + print >> sys.stderr, '[stderr:%i]' % eid, stderr + + try: + get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + + for output in outputs: + self._republish_displaypub(output, eid) + + if pyout is not None: + display(r) + + elif groupby in ('type', 'order'): + # republish stdout: + if any(stdouts): + for eid,stdout in zip(targets, stdouts): + print '[stdout:%i]' % eid, stdout + + # republish stderr: + if any(stderrs): + for eid,stderr in zip(targets, stderrs): + print >> sys.stderr, '[stderr:%i]' % eid, stderr + + try: + get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + + if groupby == 'order': + output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists)) + N = max(len(outputs) for outputs in output_lists) + for i in range(N): + for eid in targets: + outputs = output_dict[eid] + if len(outputs) >= N: + self._republish_displaypub(outputs[i], eid) + else: + # republish displaypub output + for eid,outputs in zip(targets, output_lists): + for output in outputs: + self._republish_displaypub(output, eid) + + # finally, add pyout: + for eid,r,pyout in zip(targets, results, pyouts): + if pyout is not None: + display(r) + + else: + raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby) + + class AsyncMapResult(AsyncResult): diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 7770087..0d35303 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -33,6 +33,7 @@ import zmq from IPython.config.configurable import MultipleInstanceError from IPython.core.application import BaseIPythonApplication +from IPython.utils.coloransi import TermColors from IPython.utils.jsonutil import rekey from IPython.utils.localinterfaces import LOCAL_IPS from IPython.utils.path import get_ipython_dir @@ -90,13 +91,39 @@ class ExecuteReply(object): return self.metadata[key] def __repr__(self): - pyout = self.metadata['pyout'] or {} - text_out = pyout.get('data', {}).get('text/plain', '') + pyout = self.metadata['pyout'] or {'data':{}} + text_out = pyout['data'].get('text/plain', '') if len(text_out) > 32: text_out = text_out[:29] + '...' return "" % (self.execution_count, text_out) + def _repr_pretty_(self, p, cycle): + pyout = self.metadata['pyout'] or {'data':{}} + text_out = pyout['data'].get('text/plain', '') + + if not text_out: + return + + try: + ip = get_ipython() + except NameError: + colors = "NoColor" + else: + colors = ip.colors + + if colors == "NoColor": + out = normal = "" + else: + out = TermColors.Red + normal = TermColors.Normal + + p.text( + u'[%i] ' % self.metadata['engine_id'] + + out + u'Out[%i]: ' % self.execution_count + + normal + text_out + ) + def _repr_html_(self): pyout = self.metadata['pyout'] or {'data':{}} return pyout['data'].get("text/html") diff --git a/IPython/parallel/tests/clienttest.py b/IPython/parallel/tests/clienttest.py index b902293..60017b6 100644 --- a/IPython/parallel/tests/clienttest.py +++ b/IPython/parallel/tests/clienttest.py @@ -15,6 +15,7 @@ Authors: import sys import tempfile import time +from StringIO import StringIO from nose import SkipTest @@ -59,6 +60,28 @@ def raiser(eclass): """raise an exception""" raise eclass() +def generate_output(): + """function for testing output + + publishes two outputs of each type, and returns + a rich displayable object. + """ + + import sys + from IPython.core.display import display, HTML, Math + + print "stdout" + print >> sys.stderr, "stderr" + + display(HTML("HTML")) + + print "stdout2" + print >> sys.stderr, "stderr2" + + display(Math(r"\alpha=\beta")) + + return Math("42") + # test decorator for skipping tests when libraries are unavailable def skip_without(*names): """skip a test if some names are not importable""" @@ -73,6 +96,41 @@ def skip_without(*names): return f(*args, **kwargs) return skip_without_names +#------------------------------------------------------------------------------- +# Classes +#------------------------------------------------------------------------------- + +class CapturedIO(object): + """Simple object for containing captured stdout/err StringIO objects""" + + def __init__(self, stdout, stderr): + self.stdout_io = stdout + self.stderr_io = stderr + + @property + def stdout(self): + return self.stdout_io.getvalue() + + @property + def stderr(self): + return self.stderr_io.getvalue() + + +class capture_output(object): + """context manager for capturing stdout/err""" + + def __enter__(self): + self.sys_stdout = sys.stdout + self.sys_stderr = sys.stderr + stdout = sys.stdout = StringIO() + stderr = sys.stderr = StringIO() + return CapturedIO(stdout, stderr) + + def __exit__(self, exc_type, exc_value, traceback): + sys.stdout = self.sys_stdout + sys.stderr = self.sys_stderr + + class ClusterTestCase(BaseZMQTestCase): def add_engines(self, n=1, block=True): @@ -117,6 +175,17 @@ class ClusterTestCase(BaseZMQTestCase): else: self.fail("should have raised a RemoteError") + def _wait_for(self, f, timeout=10): + """wait for a condition""" + tic = time.time() + while time.time() <= tic + timeout: + if f(): + return + time.sleep(0.1) + self.client.spin() + if not f(): + print "Warning: Awaited condition never arrived" + def setUp(self): BaseZMQTestCase.setUp(self) self.client = self.connect_client() diff --git a/IPython/parallel/tests/test_magics.py b/IPython/parallel/tests/test_magics.py new file mode 100644 index 0000000..5d5482d --- /dev/null +++ b/IPython/parallel/tests/test_magics.py @@ -0,0 +1,342 @@ +# -*- coding: utf-8 -*- +"""Test Parallel magics + +Authors: + +* Min RK +""" +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + +import sys +import time + +import zmq +from nose import SkipTest + +from IPython.testing import decorators as dec +from IPython.testing.ipunittest import ParametricTestCase + +from IPython import parallel as pmod +from IPython.parallel import error +from IPython.parallel import AsyncResult +from IPython.parallel.util import interactive + +from IPython.parallel.tests import add_engines + +from .clienttest import ClusterTestCase, capture_output, generate_output + +def setup(): + add_engines(3, total=True) + +class TestParallelMagics(ClusterTestCase, ParametricTestCase): + + def test_px_blocking(self): + ip = get_ipython() + v = self.client[-1:] + v.activate() + v.block=True + + ip.magic('px a=5') + self.assertEquals(v['a'], [5]) + ip.magic('px a=10') + self.assertEquals(v['a'], [10]) + # just 'print a' works ~99% of the time, but this ensures that + # the stdout message has arrived when the result is finished: + with capture_output() as io: + ip.magic( + 'px import sys,time;print(a);sys.stdout.flush();time.sleep(0.2)' + ) + out = io.stdout + self.assertTrue('[stdout:' in out, out) + self.assertTrue(out.rstrip().endswith('10')) + self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0') + + def test_cellpx_block_args(self): + """%%px --[no]block flags work""" + ip = get_ipython() + v = self.client[-1:] + v.activate() + v.block=False + + for block in (True, False): + v.block = block + + with capture_output() as io: + ip.run_cell_magic("px", "", "1") + if block: + self.assertTrue(io.stdout.startswith("Parallel"), io.stdout) + else: + self.assertTrue(io.stdout.startswith("Async"), io.stdout) + + with capture_output() as io: + ip.run_cell_magic("px", "--block", "1") + self.assertTrue(io.stdout.startswith("Parallel"), io.stdout) + + with capture_output() as io: + ip.run_cell_magic("px", "--noblock", "1") + self.assertTrue(io.stdout.startswith("Async"), io.stdout) + + def test_cellpx_groupby_engine(self): + """%%px --group-outputs=engine""" + ip = get_ipython() + v = self.client[:] + v.block = True + v.activate() + + v['generate_output'] = generate_output + + with capture_output() as io: + ip.run_cell_magic('px', '--group-outputs=engine', 'generate_output()') + + lines = io.stdout.strip().splitlines()[1:] + expected = [ + ('[stdout:', '] stdout'), + 'stdout2', + 'IPython.core.display.HTML', + 'IPython.core.display.Math', + ('] Out[', 'IPython.core.display.Math') + ] * len(v) + + self.assertEquals(len(lines), len(expected), io.stdout) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + expected = [ + ('[stderr:', '] stderr'), + 'stderr2', + ] * len(v) + + lines = io.stderr.strip().splitlines() + self.assertEquals(len(lines), len(expected), io.stderr) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + + def test_cellpx_groupby_order(self): + """%%px --group-outputs=order""" + ip = get_ipython() + v = self.client[:] + v.block = True + v.activate() + + v['generate_output'] = generate_output + + with capture_output() as io: + ip.run_cell_magic('px', '--group-outputs=order', 'generate_output()') + + lines = io.stdout.strip().splitlines()[1:] + expected = [] + expected.extend([ + ('[stdout:', '] stdout'), + 'stdout2', + ] * len(v)) + expected.extend([ + 'IPython.core.display.HTML', + ] * len(v)) + expected.extend([ + 'IPython.core.display.Math', + ] * len(v)) + expected.extend([ + ('] Out[', 'IPython.core.display.Math') + ] * len(v)) + + self.assertEquals(len(lines), len(expected), io.stdout) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + expected = [ + ('[stderr:', '] stderr'), + 'stderr2', + ] * len(v) + + lines = io.stderr.strip().splitlines() + self.assertEquals(len(lines), len(expected), io.stderr) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + def test_cellpx_groupby_atype(self): + """%%px --group-outputs=type""" + ip = get_ipython() + v = self.client[:] + v.block = True + v.activate() + + v['generate_output'] = generate_output + + with capture_output() as io: + ip.run_cell_magic('px', '--group-outputs=type', 'generate_output()') + + lines = io.stdout.strip().splitlines()[1:] + + expected = [] + expected.extend([ + ('[stdout:', '] stdout'), + 'stdout2', + ] * len(v)) + expected.extend([ + 'IPython.core.display.HTML', + 'IPython.core.display.Math', + ] * len(v)) + expected.extend([ + ('] Out[', 'IPython.core.display.Math') + ] * len(v)) + + self.assertEquals(len(lines), len(expected), io.stdout) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + expected = [ + ('[stderr:', '] stderr'), + 'stderr2', + ] * len(v) + + lines = io.stderr.strip().splitlines() + self.assertEquals(len(lines), len(expected), io.stderr) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + + def test_px_nonblocking(self): + ip = get_ipython() + v = self.client[-1:] + v.activate() + v.block=False + + ip.magic('px a=5') + self.assertEquals(v['a'], [5]) + ip.magic('px a=10') + self.assertEquals(v['a'], [10]) + with capture_output() as io: + ar = ip.magic('px print (a)') + self.assertTrue(isinstance(ar, AsyncResult)) + self.assertTrue('Async' in io.stdout) + self.assertFalse('[stdout:' in io.stdout) + ar = ip.magic('px 1/0') + self.assertRaisesRemote(ZeroDivisionError, ar.get) + + def test_autopx_blocking(self): + ip = get_ipython() + v = self.client[-1] + v.activate() + v.block=True + + with capture_output() as io: + ip.magic('autopx') + ip.run_cell('\n'.join(('a=5','b=12345','c=0'))) + ip.run_cell('b*=2') + ip.run_cell('print (b)') + ip.run_cell('b') + ip.run_cell("b/c") + ip.magic('autopx') + + output = io.stdout.strip() + + self.assertTrue(output.startswith('%autopx enabled'), output) + self.assertTrue(output.endswith('%autopx disabled'), output) + self.assertTrue('RemoteError: ZeroDivisionError' in output, output) + self.assertTrue('] Out[' in output, output) + self.assertTrue(': 24690' in output, output) + ar = v.get_result(-1) + self.assertEquals(v['a'], 5) + self.assertEquals(v['b'], 24690) + self.assertRaisesRemote(ZeroDivisionError, ar.get) + + def test_autopx_nonblocking(self): + ip = get_ipython() + v = self.client[-1] + v.activate() + v.block=False + + with capture_output() as io: + ip.magic('autopx') + ip.run_cell('\n'.join(('a=5','b=10','c=0'))) + ip.run_cell('print (b)') + ip.run_cell('import time; time.sleep(0.1)') + ip.run_cell("b/c") + ip.run_cell('b*=2') + ip.magic('autopx') + + output = io.stdout.strip() + + self.assertTrue(output.startswith('%autopx enabled')) + self.assertTrue(output.endswith('%autopx disabled')) + self.assertFalse('ZeroDivisionError' in output) + ar = v.get_result(-2) + self.assertRaisesRemote(ZeroDivisionError, ar.get) + # prevent TaskAborted on pulls, due to ZeroDivisionError + time.sleep(0.5) + self.assertEquals(v['a'], 5) + # b*=2 will not fire, due to abort + self.assertEquals(v['b'], 10) + + def test_result(self): + ip = get_ipython() + v = self.client[-1] + v.activate() + data = dict(a=111,b=222) + v.push(data, block=True) + + ip.magic('px a') + ip.magic('px b') + for idx, name in [ + ('', 'b'), + ('-1', 'b'), + ('2', 'b'), + ('1', 'a'), + ('-2', 'a'), + ]: + with capture_output() as io: + ip.magic('result ' + idx) + output = io.stdout.strip() + msg = "expected %s output to include %s, but got: %s" % \ + ('%result '+idx, str(data[name]), output) + self.assertTrue(str(data[name]) in output, msg) + + @dec.skipif_not_matplotlib + def test_px_pylab(self): + """%pylab works on engines""" + ip = get_ipython() + v = self.client[-1] + v.block = True + v.activate() + + with capture_output() as io: + ip.magic("px %pylab inline") + + self.assertTrue("Welcome to pylab" in io.stdout, io.stdout) + self.assertTrue("backend_inline" in io.stdout, io.stdout) + + with capture_output() as io: + ip.magic("px plot(rand(100))") + + self.assertTrue('] Out[' in io.stdout, io.stdout) + self.assertTrue('matplotlib.lines' in io.stdout, io.stdout) + + diff --git a/IPython/parallel/tests/test_view.py b/IPython/parallel/tests/test_view.py index d8cc803..e39b494 100644 --- a/IPython/parallel/tests/test_view.py +++ b/IPython/parallel/tests/test_view.py @@ -366,118 +366,6 @@ class TestView(ClusterTestCase, ParametricTestCase): self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split()) - # parallel magic tests - - def test_magic_px_blocking(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v.block=True - - ip.magic('px a=5') - self.assertEquals(v['a'], 5) - ip.magic('px a=10') - self.assertEquals(v['a'], 10) - sio = StringIO() - savestdout = sys.stdout - sys.stdout = sio - # just 'print a' worst ~99% of the time, but this ensures that - # the stdout message has arrived when the result is finished: - ip.magic('px import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)') - sys.stdout = savestdout - buf = sio.getvalue() - self.assertTrue('[stdout:' in buf, buf) - self.assertTrue(buf.rstrip().endswith('10')) - self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0') - - def test_magic_px_nonblocking(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v.block=False - - ip.magic('px a=5') - self.assertEquals(v['a'], 5) - ip.magic('px a=10') - self.assertEquals(v['a'], 10) - sio = StringIO() - savestdout = sys.stdout - sys.stdout = sio - ip.magic('px print a') - sys.stdout = savestdout - buf = sio.getvalue() - self.assertFalse('[stdout:%i]'%v.targets in buf) - ip.magic('px 1/0') - ar = v.get_result(-1) - self.assertRaisesRemote(ZeroDivisionError, ar.get) - - def test_magic_autopx_blocking(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v.block=True - - sio = StringIO() - savestdout = sys.stdout - sys.stdout = sio - ip.magic('autopx') - ip.run_cell('\n'.join(('a=5','b=10','c=0'))) - ip.run_cell('b*=2') - ip.run_cell('print (b)') - ip.run_cell("b/c") - ip.magic('autopx') - sys.stdout = savestdout - output = sio.getvalue().strip() - self.assertTrue(output.startswith('%autopx enabled')) - self.assertTrue(output.endswith('%autopx disabled')) - self.assertTrue('RemoteError: ZeroDivisionError' in output) - ar = v.get_result(-1) - self.assertEquals(v['a'], 5) - self.assertEquals(v['b'], 20) - self.assertRaisesRemote(ZeroDivisionError, ar.get) - - def test_magic_autopx_nonblocking(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v.block=False - - sio = StringIO() - savestdout = sys.stdout - sys.stdout = sio - ip.magic('autopx') - ip.run_cell('\n'.join(('a=5','b=10','c=0'))) - ip.run_cell('print (b)') - ip.run_cell('import time; time.sleep(0.1)') - ip.run_cell("b/c") - ip.run_cell('b*=2') - ip.magic('autopx') - sys.stdout = savestdout - output = sio.getvalue().strip() - self.assertTrue(output.startswith('%autopx enabled')) - self.assertTrue(output.endswith('%autopx disabled')) - self.assertFalse('ZeroDivisionError' in output) - ar = v.get_result(-2) - self.assertRaisesRemote(ZeroDivisionError, ar.get) - # prevent TaskAborted on pulls, due to ZeroDivisionError - time.sleep(0.5) - self.assertEquals(v['a'], 5) - # b*=2 will not fire, due to abort - self.assertEquals(v['b'], 10) - - def test_magic_result(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v['a'] = 111 - ra = v['a'] - - ar = ip.magic('result') - self.assertEquals(ar.msg_ids, [v.history[-1]]) - self.assertEquals(ar.get(), 111) - ar = ip.magic('result -2') - self.assertEquals(ar.msg_ids, [v.history[-2]]) - def test_unicode_execute(self): """test executing unicode strings""" v = self.client[-1] @@ -575,16 +463,6 @@ class TestView(ClusterTestCase, ParametricTestCase): # begin execute tests - def _wait_for(self, f, timeout=10): - tic = time.time() - while time.time() <= tic + timeout: - if f(): - return - time.sleep(0.1) - self.client.spin() - if not f(): - print "Warning: Awaited condition never arrived" - def test_execute_reply(self): e0 = self.client[self.client.ids[0]] diff --git a/IPython/zmq/displayhook.py b/IPython/zmq/displayhook.py index a3f76ab..a3d9178 100644 --- a/IPython/zmq/displayhook.py +++ b/IPython/zmq/displayhook.py @@ -31,12 +31,15 @@ class ZMQDisplayHook(object): def _encode_binary(format_dict): + encoded = format_dict.copy() pngdata = format_dict.get('image/png') - if pngdata is not None: - format_dict['image/png'] = encodestring(pngdata).decode('ascii') + if isinstance(pngdata, bytes): + encoded['image/png'] = encodestring(pngdata).decode('ascii') jpegdata = format_dict.get('image/jpeg') - if jpegdata is not None: - format_dict['image/jpeg'] = encodestring(jpegdata).decode('ascii') + if isinstance(jpegdata, bytes): + encoded['image/jpeg'] = encodestring(jpegdata).decode('ascii') + + return encoded class ZMQShellDisplayHook(DisplayHook): @@ -61,8 +64,7 @@ class ZMQShellDisplayHook(DisplayHook): self.msg['content']['execution_count'] = self.prompt_count def write_format_data(self, format_dict): - _encode_binary(format_dict) - self.msg['content']['data'] = format_dict + self.msg['content']['data'] = _encode_binary(format_dict) def finish_displayhook(self): """Finish up all displayhook activities.""" diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 2775d54..b20ef16 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -382,6 +382,8 @@ class Kernel(Configurable): # runlines. We'll need to clean up this logic later. if shell._reply_content is not None: reply_content.update(shell._reply_content) + e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute') + reply_content['engine_info'] = e_info # reset after use shell._reply_content = None diff --git a/IPython/zmq/zmqshell.py b/IPython/zmq/zmqshell.py index 7d9a7b7..0a0709f 100644 --- a/IPython/zmq/zmqshell.py +++ b/IPython/zmq/zmqshell.py @@ -77,8 +77,7 @@ class ZMQDisplayPublisher(DisplayPublisher): self._validate_data(source, data, metadata) content = {} content['source'] = source - _encode_binary(data) - content['data'] = data + content['data'] = _encode_binary(data) content['metadata'] = metadata self.session.send( self.pub_socket, u'display_data', json_clean(content), diff --git a/docs/examples/parallel/Parallel Magics.ipynb b/docs/examples/parallel/Parallel Magics.ipynb new file mode 100644 index 0000000..e49cd72 --- /dev/null +++ b/docs/examples/parallel/Parallel Magics.ipynb @@ -0,0 +1,228 @@ +{ + "metadata": { + "name": "Parallel Magics" + }, + "nbformat": 3, + "worksheets": [ + { + "cells": [ + { + "cell_type": "heading", + "level": 1, + "source": [ + "Using Parallel Magics" + ] + }, + { + "cell_type": "markdown", + "source": [ + "IPython has a few magics for working with your engines.", + "", + "This assumes you have started an IPython cluster, either with the notebook interface,", + "or the `ipcluster/controller/engine` commands." + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "from IPython import parallel", + "rc = parallel.Client()", + "dv = rc[:]", + "dv.block = True", + "dv" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "The parallel magics come from the `parallelmagics` IPython extension.", + "The magics are set to work with a particular View object,", + "so to activate them, you call the `activate()` method on a particular view:" + ] + }, + { + "cell_type": "code", + "collapsed": true, + "input": [ + "dv.activate()" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Now we can execute code remotely with `%px`:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%px a=5" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%px print a" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%px a" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "You don't have to wait for results:" + ] + }, + { + "cell_type": "code", + "collapsed": true, + "input": [ + "dv.block = False" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%px import time", + "%px time.sleep(5)", + "%px time.time()" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "But you will notice that this didn't output the result of the last command.", + "For this, we have `%result`, which displays the output of the latest request:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%result" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Remember, an IPython engine is IPython, so you can do magics remotely as well!" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "dv.block = True", + "%px %pylab inline" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "`%%px` can also be used as a cell magic, for submitting whole blocks.", + "This one acceps `--block` and `--noblock` flags to specify", + "the blocking behavior, though the default is unchanged.", + "" + ] + }, + { + "cell_type": "code", + "collapsed": true, + "input": [ + "dv.scatter('id', dv.targets, flatten=True)", + "dv['stride'] = len(dv)" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%%px --noblock", + "x = linspace(0,pi,1000)", + "for n in range(id,12, stride):", + " print n", + " plt.plot(x,sin(n*x))", + "plt.title(\"Plot %i\" % id)" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%result" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "It also lets you choose some amount of the grouping of the outputs with `--group-outputs`:", + "", + "The choices are:", + "", + "* `engine` - all of an engine's output is collected together", + "* `type` - where stdout of each engine is grouped, etc. (the default)", + "* `order` - same as `type`, but individual displaypub outputs are interleaved.", + " That is, it will output the first plot from each engine, then the second from each,", + " etc." + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%%px --group-outputs=engine", + "x = linspace(0,pi,1000)", + "for n in range(id,12, stride):", + " print n", + " plt.plot(x,sin(n*x))", + "plt.title(\"Plot %i\" % id)" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": true, + "input": [ + "" + ], + "language": "python", + "outputs": [] + } + ] + } + ] +} \ No newline at end of file diff --git a/docs/source/parallel/parallel_mpi.txt b/docs/source/parallel/parallel_mpi.txt index 5838010..de0e69a 100644 --- a/docs/source/parallel/parallel_mpi.txt +++ b/docs/source/parallel/parallel_mpi.txt @@ -120,21 +120,19 @@ using our :func:`psum` function: In [1]: from IPython.parallel import Client - In [2]: %load_ext parallel_magic - In [3]: c = Client(profile='mpi') In [4]: view = c[:] - In [5]: view.activate() + In [5]: view.activate() # enabe magics # run the contents of the file on each engine: In [6]: view.run('psum.py') - In [6]: px a = np.random.rand(100) + In [6]: %px a = np.random.rand(100) Parallel execution on engines: [0,1,2,3] - In [8]: px s = psum(a) + In [8]: %px s = psum(a) Parallel execution on engines: [0,1,2,3] In [9]: view['s'] diff --git a/docs/source/parallel/parallel_multiengine.txt b/docs/source/parallel/parallel_multiengine.txt index f51fe41..adb8771 100644 --- a/docs/source/parallel/parallel_multiengine.txt +++ b/docs/source/parallel/parallel_multiengine.txt @@ -389,11 +389,11 @@ Parallel magic commands ----------------------- We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) -that make it more pleasant to execute Python commands on the engines -interactively. These are simply shortcuts to :meth:`execute` and -:meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single -Python command on the engines specified by the :attr:`targets` attribute of the -:class:`DirectView` instance: +that make it a bit more pleasant to execute Python commands on the engines interactively. +These are simply shortcuts to :meth:`.DirectView.execute` +and :meth:`.AsyncResult.display_outputs` methods repsectively. +The ``%px`` magic executes a single Python command on the engines +specified by the :attr:`targets` attribute of the :class:`DirectView` instance: .. sourcecode:: ipython @@ -413,43 +413,127 @@ Python command on the engines specified by the :attr:`targets` attribute of the In [27]: %px a = numpy.random.rand(2,2) Parallel execution on engines: [0, 1, 2, 3] - In [28]: %px ev = numpy.linalg.eigvals(a) + In [28]: %px numpy.linalg.eigvals(a) Parallel execution on engines: [0, 1, 2, 3] + [0] Out[68]: array([ 0.77120707, -0.19448286]) + [1] Out[68]: array([ 1.10815921, 0.05110369]) + [2] Out[68]: array([ 0.74625527, -0.37475081]) + [3] Out[68]: array([ 0.72931905, 0.07159743]) + + In [29]: %px print 'hi' + Parallel execution on engine(s): [0, 1, 2, 3] + [stdout:0] hi + [stdout:1] hi + [stdout:2] hi + [stdout:3] hi + + +Since engines are IPython as well, you can even run magics remotely: + +.. sourcecode:: ipython + + In [28]: %px %pylab inline + Parallel execution on engine(s): [0, 1, 2, 3] + [stdout:0] + Welcome to pylab, a matplotlib-based Python environment... + For more information, type 'help(pylab)'. + [stdout:1] + Welcome to pylab, a matplotlib-based Python environment... + For more information, type 'help(pylab)'. + [stdout:2] + Welcome to pylab, a matplotlib-based Python environment... + For more information, type 'help(pylab)'. + [stdout:3] + Welcome to pylab, a matplotlib-based Python environment... + For more information, type 'help(pylab)'. + +And once in pylab mode with the inline backend, +you can make plots and they will be displayed in your frontend +if it suports the inline figures (e.g. notebook or qtconsole): + +.. sourcecode:: ipython + + In [40]: %px plot(rand(100)) + Parallel execution on engine(s): [0, 1, 2, 3] + + + + + [0] Out[79]: [] + [1] Out[79]: [] + [2] Out[79]: [] + [3] Out[79]: [] - In [28]: dv['ev'] - Out[28]: [ array([ 1.09522024, -0.09645227]), - ....: array([ 1.21435496, -0.35546712]), - ....: array([ 0.72180653, 0.07133042]), - ....: array([ 1.46384341, 1.04353244e-04]) - ....: ] -The ``%result`` magic gets the most recent result, or takes an argument -specifying the index of the result to be requested. It is simply a shortcut to the -:meth:`get_result` method: +``%%px`` Cell Magic +******************* + +`%%px` can also be used as a Cell Magic, which accepts ``--[no]block`` flags, +and a ``--group-outputs`` argument, which adjust how the outputs of multiple +engines are presented. + +.. seealso:: + + :meth:`.AsyncResult.display_outputs` for the grouping options. .. sourcecode:: ipython + + In [50]: %%px --block --group-outputs=engine + ....: import numpy as np + ....: A = np.random.random((2,2)) + ....: ev = numpy.linalg.eigvals(A) + ....: print ev + ....: ev.max() + ....: + Parallel execution on engine(s): [0, 1, 2, 3] + [stdout:0] [ 0.60640442 0.95919621] + [0] Out[73]: 0.9591962130899806 + [stdout:1] [ 0.38501813 1.29430871] + [1] Out[73]: 1.2943087091452372 + [stdout:2] [-0.85925141 0.9387692 ] + [2] Out[73]: 0.93876920456230284 + [stdout:3] [ 0.37998269 1.24218246] + [3] Out[73]: 1.2421824618493817 + +``%result`` Magic +***************** + +If you are using ``%px`` in non-blocking mode, you won't get output. +You can use ``%result`` to display the outputs of the latest command, +just as is done when ``%px`` is blocking: + +.. sourcecode:: ipython + + In [39]: dv.block = False - In [29]: dv.apply_async(lambda : ev) + In [40]: %px print 'hi' + Async parallel execution on engine(s): [0, 1, 2, 3] - In [30]: %result - Out[30]: [ [ 1.28167017 0.14197338], - ....: [-0.14093616 1.27877273], - ....: [-0.37023573 1.06779409], - ....: [ 0.83664764 -0.25602658] ] + In [41]: %result + [stdout:0] hi + [stdout:1] hi + [stdout:2] hi + [stdout:3] hi + +``%result`` simply calls :meth:`.AsyncResult.display_outputs` on the most recent request. +You can pass integers as indices if you want a result other than the latest, +e.g. ``%result -2``, or ``%result 0`` for the first. + + +``%autopx`` +*********** The ``%autopx`` magic switches to a mode where everything you type is executed -on the engines given by the :attr:`targets` attribute: +on the engines until you do ``%autopx`` again. .. sourcecode:: ipython - In [30]: dv.block=False + In [30]: dv.block=True In [31]: %autopx - Auto Parallel Enabled - Type %autopx to disable + %autopx enabled In [32]: max_evals = [] - In [33]: for i in range(100): ....: a = numpy.random.rand(10,10) @@ -457,22 +541,15 @@ on the engines given by the :attr:`targets` attribute: ....: evals = numpy.linalg.eigvals(a) ....: max_evals.append(evals[0].real) ....: - ....: - - - In [34]: %autopx - Auto Parallel Disabled - In [35]: dv.block=True - - In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals)) - Parallel execution on engines: [0, 1, 2, 3] + In [34]: print "Average max eigenvalue is: %f" % (sum(max_evals)/len(max_evals)) + [stdout:0] Average max eigenvalue is: 10.193101 + [stdout:1] Average max eigenvalue is: 10.064508 + [stdout:2] Average max eigenvalue is: 10.055724 + [stdout:3] Average max eigenvalue is: 10.086876 - In [37]: dv['ans'] - Out[37]: [ 'Average max eigenvalue is: 10.1387247332', - ....: 'Average max eigenvalue is: 10.2076902286', - ....: 'Average max eigenvalue is: 10.1891484655', - ....: 'Average max eigenvalue is: 10.1158837784',] + In [35]: %autopx + Auto Parallel Disabled Moving Python objects around