diff --git a/IPython/core/magic.py b/IPython/core/magic.py index 6d1c9d1..4f39a53 100644 --- a/IPython/core/magic.py +++ b/IPython/core/magic.py @@ -542,7 +542,7 @@ class Magics(object): argv = arg_split(arg_str, posix, strict) # Do regular option processing try: - opts,args = getopt(argv,opt_str,*long_opts) + opts,args = getopt(argv, opt_str, long_opts) except GetoptError,e: raise UsageError('%s ( allowed: "%s" %s)' % (e.msg,opt_str, " ".join(long_opts))) diff --git a/IPython/core/tests/test_magic.py b/IPython/core/tests/test_magic.py index 296d895..b6572dc 100644 --- a/IPython/core/tests/test_magic.py +++ b/IPython/core/tests/test_magic.py @@ -69,6 +69,15 @@ def test_magic_parse_options(): expected = path nt.assert_equals(opts['f'], expected) +def test_magic_parse_long_options(): + """Magic.parse_options can handle --foo=bar long options""" + ip = get_ipython() + m = DummyMagics(ip) + opts, _ = m.parse_options('--foo --bar=bubble', 'a', 'foo', 'bar=') + nt.assert_true('foo' in opts) + nt.assert_true('bar' in opts) + nt.assert_true(opts['bar'], "bubble") + @dec.skip_without('sqlite3') def doctest_hist_f(): diff --git a/IPython/extensions/parallelmagic.py b/IPython/extensions/parallelmagic.py index abb49fb..80c30e2 100644 --- a/IPython/extensions/parallelmagic.py +++ b/IPython/extensions/parallelmagic.py @@ -11,15 +11,15 @@ Usage ``%autopx`` -@AUTOPX_DOC@ +{AUTOPX_DOC} ``%px`` -@PX_DOC@ +{PX_DOC} ``%result`` -@RESULT_DOC@ +{RESULT_DOC} """ @@ -37,87 +37,172 @@ Usage import ast import re -from IPython.core.magic import Magics, magics_class, line_magic +from IPython.core.error import UsageError +from IPython.core.magic import Magics, magics_class, line_magic, cell_magic from IPython.testing.skipdoctest import skip_doctest #----------------------------------------------------------------------------- # Definitions of magic functions for use with IPython #----------------------------------------------------------------------------- -NO_ACTIVE_VIEW = """ -Use activate() on a DirectView object to activate it for magics. -""" + +NO_ACTIVE_VIEW = "Use activate() on a DirectView object to use it with magics." @magics_class class ParallelMagics(Magics): """A set of magics useful when controlling a parallel IPython cluster. """ - - def __init__(self, shell): - super(ParallelMagics, self).__init__(shell) - # A flag showing if autopx is activated or not - self.autopx = False + + # A flag showing if autopx is activated or not + _autopx = False + # the current view used by the magics: + active_view = None @skip_doctest @line_magic def result(self, parameter_s=''): - """Print the result of command i on all engines.. + """Print the result of command i on all engines. To use this a :class:`DirectView` instance must be created and then activated by calling its :meth:`activate` method. + + This lets you recall the results of %px computations after + asynchronous submission (view.block=False). Then you can do the following:: - In [23]: %result - Out[23]: - - [0] In [6]: a = 10 - [1] In [6]: a = 10 - - In [22]: %result 6 - Out[22]: - - [0] In [6]: a = 10 - [1] In [6]: a = 10 + In [23]: %px os.getpid() + Async parallel execution on engine(s): all + + In [24]: %result + [ 8] Out[10]: 60920 + [ 9] Out[10]: 60921 + [10] Out[10]: 60922 + [11] Out[10]: 60923 """ + if self.active_view is None: - print NO_ACTIVE_VIEW - return - + raise UsageError(NO_ACTIVE_VIEW) + + stride = len(self.active_view) try: index = int(parameter_s) except: - index = None - result = self.active_view.get_result(index) - return result + index = -1 + msg_ids = self.active_view.history[stride * index:(stride * (index + 1)) or None] + + result = self.active_view.get_result(msg_ids) + + result.get() + result.display_outputs() @skip_doctest @line_magic def px(self, parameter_s=''): """Executes the given python command in parallel. - + To use this a :class:`DirectView` instance must be created and then activated by calling its :meth:`activate` method. Then you can do the following:: - In [24]: %px a = 5 + In [24]: %px a = os.getpid() Parallel execution on engine(s): all - Out[24]: - - [0] In [7]: a = 5 - [1] In [7]: a = 5 + + In [25]: %px print a + [stdout:0] 1234 + [stdout:1] 1235 + [stdout:2] 1236 + [stdout:3] 1237 """ + return self.parallel_execute(parameter_s) + + def parallel_execute(self, cell, block=None, groupby='type'): + """implementation used by %px and %%parallel""" if self.active_view is None: - print NO_ACTIVE_VIEW - return - print "Parallel execution on engine(s): %s" % self.active_view.targets - result = self.active_view.execute(parameter_s, block=False) - if self.active_view.block: + raise UsageError(NO_ACTIVE_VIEW) + + # defaults: + block = self.active_view.block if block is None else block + + base = "Parallel" if block else "Async parallel" + print base + " execution on engine(s): %s" % self.active_view.targets + + result = self.active_view.execute(cell, silent=False, block=False) + if block: result.get() - self._maybe_display_output(result) + result.display_outputs(groupby) + else: + # return AsyncResult only on non-blocking submission + return result + + @skip_doctest + @cell_magic('px') + def cell_px(self, line='', cell=None): + """Executes the given python command in parallel. + + Cell magic usage: + + %%px [-o] [-e] [--group-options=type|engine|order] [--[no]block] + + Options (%%px cell magic only): + + -o: collate outputs in oder (same as group-outputs=order) + + -e: group outputs by engine (same as group-outputs=engine) + + --group-outputs=type [default behavior]: + each output type (stdout, stderr, displaypub) for all engines + displayed together. + + --group-outputs=order: + The same as 'type', but individual displaypub outputs (e.g. plots) + will be interleaved, so it will display all of the first plots, + then all of the second plots, etc. + + --group-outputs=engine: + All of an engine's output is displayed before moving on to the next. + + --[no]block: + Whether or not to block for the execution to complete + (and display the results). If unspecified, the active view's + + + To use this a :class:`DirectView` instance must be created + and then activated by calling its :meth:`activate` method. + + Then you can do the following:: + + In [24]: %%parallel --noblock a = os.getpid() + Async parallel execution on engine(s): all + + In [25]: %px print a + [stdout:0] 1234 + [stdout:1] 1235 + [stdout:2] 1236 + [stdout:3] 1237 + """ + + block = None + groupby = 'type' + # as a cell magic, we accept args + opts, _ = self.parse_options(line, 'oe', 'group-outputs=', 'block', 'noblock') + + if 'group-outputs' in opts: + groupby = opts['group-outputs'] + elif 'o' in opts: + groupby = 'order' + elif 'e' in opts: + groupby = 'engine' + + if 'block' in opts: + block = True + elif 'noblock' in opts: + block = False + + return self.parallel_execute(cell, block=block, groupby=groupby) @skip_doctest @line_magic @@ -149,7 +234,7 @@ class ParallelMagics(Magics): In [27]: %autopx %autopx disabled """ - if self.autopx: + if self._autopx: self._disable_autopx() else: self._enable_autopx() @@ -159,50 +244,23 @@ class ParallelMagics(Magics): pxrun_cell. """ if self.active_view is None: - print NO_ACTIVE_VIEW - return + raise UsageError(NO_ACTIVE_VIEW) - # override run_cell and run_code + # override run_cell self._original_run_cell = self.shell.run_cell self.shell.run_cell = self.pxrun_cell - self._original_run_code = self.shell.run_code - self.shell.run_code = self.pxrun_code - self.autopx = True + self._autopx = True print "%autopx enabled" def _disable_autopx(self): """Disable %autopx by restoring the original InteractiveShell.run_cell. """ - if self.autopx: + if self._autopx: self.shell.run_cell = self._original_run_cell - self.shell.run_code = self._original_run_code - self.autopx = False + self._autopx = False print "%autopx disabled" - def _maybe_display_output(self, result): - """Maybe display the output of a parallel result. - - If self.active_view.block is True, wait for the result - and display the result. Otherwise, this is a noop. - """ - if isinstance(result.stdout, basestring): - # single result - stdouts = [result.stdout.rstrip()] - else: - stdouts = [s.rstrip() for s in result.stdout] - - targets = self.active_view.targets - if isinstance(targets, int): - targets = [targets] - elif targets == 'all': - targets = self.active_view.client.ids - - if any(stdouts): - for eid,stdout in zip(targets, stdouts): - print '[stdout:%i]'%eid, stdout - - def pxrun_cell(self, raw_cell, store_history=False, silent=False): """drop-in replacement for InteractiveShell.run_cell. @@ -263,47 +321,16 @@ class ParallelMagics(Magics): self.shell.showtraceback() return True else: - self._maybe_display_output(result) - return False - - def pxrun_code(self, code_obj): - """drop-in replacement for InteractiveShell.run_code. - - This executes code remotely, instead of in the local namespace. - - See InteractiveShell.run_code for details. - """ - ipself = self.shell - # check code object for the autopx magic - if 'get_ipython' in code_obj.co_names and 'magic' in code_obj.co_names \ - and any( [ isinstance(c, basestring) and 'autopx' in c - for c in code_obj.co_consts ]): - self._disable_autopx() - return False - else: - try: - result = self.active_view.execute(code_obj, block=False) - except: - ipself.showtraceback() - return True - else: - if self.active_view.block: - try: - result.get() - except: - self.shell.showtraceback() - return True - else: - self._maybe_display_output(result) + with ipself.builtin_trap: + result.display_outputs() return False -__doc__ = __doc__.replace('@AUTOPX_DOC@', - " " + ParallelMagics.autopx.__doc__) -__doc__ = __doc__.replace('@PX_DOC@', - " " + ParallelMagics.px.__doc__) -__doc__ = __doc__.replace('@RESULT_DOC@', - " " + ParallelMagics.result.__doc__) +__doc__ = __doc__.format( + AUTOPX_DOC = ' '*8 + ParallelMagics.autopx.__doc__, + PX_DOC = ' '*8 + ParallelMagics.px.__doc__, + RESULT_DOC = ' '*8 + ParallelMagics.result.__doc__ +) _loaded = False diff --git a/IPython/parallel/client/asyncresult.py b/IPython/parallel/client/asyncresult.py index 5931b6f..1bae6f6 100644 --- a/IPython/parallel/client/asyncresult.py +++ b/IPython/parallel/client/asyncresult.py @@ -21,7 +21,7 @@ from datetime import datetime from zmq import MessageTracker -from IPython.core.display import clear_output +from IPython.core.display import clear_output, display from IPython.external.decorator import decorator from IPython.parallel import error @@ -377,6 +377,140 @@ class AsyncResult(object): sys.stdout.flush() print print "done" + + def _republish_displaypub(self, content, eid): + """republish individual displaypub content dicts""" + try: + ip = get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + md = content['metadata'] or {} + md['engine'] = eid + ip.display_pub.publish(content['source'], content['data'], md) + + + def _display_single_result(self): + + print self.stdout + print >> sys.stderr, self.stderr + + try: + get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + + for output in self.outputs: + self._republish_displaypub(output, self.engine_id) + + if self.pyout is not None: + display(self.get()) + + @check_ready + def display_outputs(self, groupby="type"): + """republish the outputs of the computation + + Parameters + ---------- + + groupby : str [default: type] + if 'type': + Group outputs by type (show all stdout, then all stderr, etc.): + + [stdout:1] foo + [stdout:2] foo + [stderr:1] bar + [stderr:2] bar + if 'engine': + Display outputs for each engine before moving on to the next: + + [stdout:1] foo + [stderr:1] bar + [stdout:2] foo + [stderr:2] bar + + if 'order': + Like 'type', but further collate individual displaypub + outputs. This is meant for cases of each command producing + several plots, and you would like to see all of the first + plots together, then all of the second plots, and so on. + """ + # flush iopub, just in case + self._client._flush_iopub(self._client._iopub_socket) + if self._single_result: + self._display_single_result() + return + + stdouts = [s.rstrip() for s in self.stdout] + stderrs = [s.rstrip() for s in self.stderr] + pyouts = [p for p in self.pyout] + output_lists = self.outputs + results = self.get() + + targets = self.engine_id + + if groupby == "engine": + for eid,stdout,stderr,outputs,r,pyout in zip( + targets, stdouts, stderrs, output_lists, results, pyouts + ): + if stdout: + print '[stdout:%i]' % eid, stdout + if stderr: + print >> sys.stderr, '[stderr:%i]' % eid, stderr + + try: + get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + + for output in outputs: + self._republish_displaypub(output, eid) + + if pyout is not None: + display(r) + + elif groupby in ('type', 'order'): + # republish stdout: + if any(stdouts): + for eid,stdout in zip(targets, stdouts): + print '[stdout:%i]' % eid, stdout + + # republish stderr: + if any(stderrs): + for eid,stderr in zip(targets, stderrs): + print >> sys.stderr, '[stderr:%i]' % eid, stderr + + try: + get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + + if groupby == 'order': + output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists)) + N = max(len(outputs) for outputs in output_lists) + for i in range(N): + for eid in targets: + outputs = output_dict[eid] + if len(outputs) >= N: + self._republish_displaypub(outputs[i], eid) + else: + # republish displaypub output + for eid,outputs in zip(targets, output_lists): + for output in outputs: + self._republish_displaypub(output, eid) + + # finally, add pyout: + for eid,r,pyout in zip(targets, results, pyouts): + if pyout is not None: + display(r) + + else: + raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby) + + class AsyncMapResult(AsyncResult): diff --git a/IPython/parallel/client/client.py b/IPython/parallel/client/client.py index 7770087..0d35303 100644 --- a/IPython/parallel/client/client.py +++ b/IPython/parallel/client/client.py @@ -33,6 +33,7 @@ import zmq from IPython.config.configurable import MultipleInstanceError from IPython.core.application import BaseIPythonApplication +from IPython.utils.coloransi import TermColors from IPython.utils.jsonutil import rekey from IPython.utils.localinterfaces import LOCAL_IPS from IPython.utils.path import get_ipython_dir @@ -90,13 +91,39 @@ class ExecuteReply(object): return self.metadata[key] def __repr__(self): - pyout = self.metadata['pyout'] or {} - text_out = pyout.get('data', {}).get('text/plain', '') + pyout = self.metadata['pyout'] or {'data':{}} + text_out = pyout['data'].get('text/plain', '') if len(text_out) > 32: text_out = text_out[:29] + '...' return "" % (self.execution_count, text_out) + def _repr_pretty_(self, p, cycle): + pyout = self.metadata['pyout'] or {'data':{}} + text_out = pyout['data'].get('text/plain', '') + + if not text_out: + return + + try: + ip = get_ipython() + except NameError: + colors = "NoColor" + else: + colors = ip.colors + + if colors == "NoColor": + out = normal = "" + else: + out = TermColors.Red + normal = TermColors.Normal + + p.text( + u'[%i] ' % self.metadata['engine_id'] + + out + u'Out[%i]: ' % self.execution_count + + normal + text_out + ) + def _repr_html_(self): pyout = self.metadata['pyout'] or {'data':{}} return pyout['data'].get("text/html") diff --git a/IPython/parallel/tests/clienttest.py b/IPython/parallel/tests/clienttest.py index b902293..60017b6 100644 --- a/IPython/parallel/tests/clienttest.py +++ b/IPython/parallel/tests/clienttest.py @@ -15,6 +15,7 @@ Authors: import sys import tempfile import time +from StringIO import StringIO from nose import SkipTest @@ -59,6 +60,28 @@ def raiser(eclass): """raise an exception""" raise eclass() +def generate_output(): + """function for testing output + + publishes two outputs of each type, and returns + a rich displayable object. + """ + + import sys + from IPython.core.display import display, HTML, Math + + print "stdout" + print >> sys.stderr, "stderr" + + display(HTML("HTML")) + + print "stdout2" + print >> sys.stderr, "stderr2" + + display(Math(r"\alpha=\beta")) + + return Math("42") + # test decorator for skipping tests when libraries are unavailable def skip_without(*names): """skip a test if some names are not importable""" @@ -73,6 +96,41 @@ def skip_without(*names): return f(*args, **kwargs) return skip_without_names +#------------------------------------------------------------------------------- +# Classes +#------------------------------------------------------------------------------- + +class CapturedIO(object): + """Simple object for containing captured stdout/err StringIO objects""" + + def __init__(self, stdout, stderr): + self.stdout_io = stdout + self.stderr_io = stderr + + @property + def stdout(self): + return self.stdout_io.getvalue() + + @property + def stderr(self): + return self.stderr_io.getvalue() + + +class capture_output(object): + """context manager for capturing stdout/err""" + + def __enter__(self): + self.sys_stdout = sys.stdout + self.sys_stderr = sys.stderr + stdout = sys.stdout = StringIO() + stderr = sys.stderr = StringIO() + return CapturedIO(stdout, stderr) + + def __exit__(self, exc_type, exc_value, traceback): + sys.stdout = self.sys_stdout + sys.stderr = self.sys_stderr + + class ClusterTestCase(BaseZMQTestCase): def add_engines(self, n=1, block=True): @@ -117,6 +175,17 @@ class ClusterTestCase(BaseZMQTestCase): else: self.fail("should have raised a RemoteError") + def _wait_for(self, f, timeout=10): + """wait for a condition""" + tic = time.time() + while time.time() <= tic + timeout: + if f(): + return + time.sleep(0.1) + self.client.spin() + if not f(): + print "Warning: Awaited condition never arrived" + def setUp(self): BaseZMQTestCase.setUp(self) self.client = self.connect_client() diff --git a/IPython/parallel/tests/test_magics.py b/IPython/parallel/tests/test_magics.py new file mode 100644 index 0000000..5d5482d --- /dev/null +++ b/IPython/parallel/tests/test_magics.py @@ -0,0 +1,342 @@ +# -*- coding: utf-8 -*- +"""Test Parallel magics + +Authors: + +* Min RK +""" +#------------------------------------------------------------------------------- +# Copyright (C) 2011 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#------------------------------------------------------------------------------- + +#------------------------------------------------------------------------------- +# Imports +#------------------------------------------------------------------------------- + +import sys +import time + +import zmq +from nose import SkipTest + +from IPython.testing import decorators as dec +from IPython.testing.ipunittest import ParametricTestCase + +from IPython import parallel as pmod +from IPython.parallel import error +from IPython.parallel import AsyncResult +from IPython.parallel.util import interactive + +from IPython.parallel.tests import add_engines + +from .clienttest import ClusterTestCase, capture_output, generate_output + +def setup(): + add_engines(3, total=True) + +class TestParallelMagics(ClusterTestCase, ParametricTestCase): + + def test_px_blocking(self): + ip = get_ipython() + v = self.client[-1:] + v.activate() + v.block=True + + ip.magic('px a=5') + self.assertEquals(v['a'], [5]) + ip.magic('px a=10') + self.assertEquals(v['a'], [10]) + # just 'print a' works ~99% of the time, but this ensures that + # the stdout message has arrived when the result is finished: + with capture_output() as io: + ip.magic( + 'px import sys,time;print(a);sys.stdout.flush();time.sleep(0.2)' + ) + out = io.stdout + self.assertTrue('[stdout:' in out, out) + self.assertTrue(out.rstrip().endswith('10')) + self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0') + + def test_cellpx_block_args(self): + """%%px --[no]block flags work""" + ip = get_ipython() + v = self.client[-1:] + v.activate() + v.block=False + + for block in (True, False): + v.block = block + + with capture_output() as io: + ip.run_cell_magic("px", "", "1") + if block: + self.assertTrue(io.stdout.startswith("Parallel"), io.stdout) + else: + self.assertTrue(io.stdout.startswith("Async"), io.stdout) + + with capture_output() as io: + ip.run_cell_magic("px", "--block", "1") + self.assertTrue(io.stdout.startswith("Parallel"), io.stdout) + + with capture_output() as io: + ip.run_cell_magic("px", "--noblock", "1") + self.assertTrue(io.stdout.startswith("Async"), io.stdout) + + def test_cellpx_groupby_engine(self): + """%%px --group-outputs=engine""" + ip = get_ipython() + v = self.client[:] + v.block = True + v.activate() + + v['generate_output'] = generate_output + + with capture_output() as io: + ip.run_cell_magic('px', '--group-outputs=engine', 'generate_output()') + + lines = io.stdout.strip().splitlines()[1:] + expected = [ + ('[stdout:', '] stdout'), + 'stdout2', + 'IPython.core.display.HTML', + 'IPython.core.display.Math', + ('] Out[', 'IPython.core.display.Math') + ] * len(v) + + self.assertEquals(len(lines), len(expected), io.stdout) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + expected = [ + ('[stderr:', '] stderr'), + 'stderr2', + ] * len(v) + + lines = io.stderr.strip().splitlines() + self.assertEquals(len(lines), len(expected), io.stderr) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + + def test_cellpx_groupby_order(self): + """%%px --group-outputs=order""" + ip = get_ipython() + v = self.client[:] + v.block = True + v.activate() + + v['generate_output'] = generate_output + + with capture_output() as io: + ip.run_cell_magic('px', '--group-outputs=order', 'generate_output()') + + lines = io.stdout.strip().splitlines()[1:] + expected = [] + expected.extend([ + ('[stdout:', '] stdout'), + 'stdout2', + ] * len(v)) + expected.extend([ + 'IPython.core.display.HTML', + ] * len(v)) + expected.extend([ + 'IPython.core.display.Math', + ] * len(v)) + expected.extend([ + ('] Out[', 'IPython.core.display.Math') + ] * len(v)) + + self.assertEquals(len(lines), len(expected), io.stdout) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + expected = [ + ('[stderr:', '] stderr'), + 'stderr2', + ] * len(v) + + lines = io.stderr.strip().splitlines() + self.assertEquals(len(lines), len(expected), io.stderr) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + def test_cellpx_groupby_atype(self): + """%%px --group-outputs=type""" + ip = get_ipython() + v = self.client[:] + v.block = True + v.activate() + + v['generate_output'] = generate_output + + with capture_output() as io: + ip.run_cell_magic('px', '--group-outputs=type', 'generate_output()') + + lines = io.stdout.strip().splitlines()[1:] + + expected = [] + expected.extend([ + ('[stdout:', '] stdout'), + 'stdout2', + ] * len(v)) + expected.extend([ + 'IPython.core.display.HTML', + 'IPython.core.display.Math', + ] * len(v)) + expected.extend([ + ('] Out[', 'IPython.core.display.Math') + ] * len(v)) + + self.assertEquals(len(lines), len(expected), io.stdout) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + expected = [ + ('[stderr:', '] stderr'), + 'stderr2', + ] * len(v) + + lines = io.stderr.strip().splitlines() + self.assertEquals(len(lines), len(expected), io.stderr) + for line,expect in zip(lines, expected): + if isinstance(expect, str): + expect = [expect] + for ex in expect: + self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) + + + def test_px_nonblocking(self): + ip = get_ipython() + v = self.client[-1:] + v.activate() + v.block=False + + ip.magic('px a=5') + self.assertEquals(v['a'], [5]) + ip.magic('px a=10') + self.assertEquals(v['a'], [10]) + with capture_output() as io: + ar = ip.magic('px print (a)') + self.assertTrue(isinstance(ar, AsyncResult)) + self.assertTrue('Async' in io.stdout) + self.assertFalse('[stdout:' in io.stdout) + ar = ip.magic('px 1/0') + self.assertRaisesRemote(ZeroDivisionError, ar.get) + + def test_autopx_blocking(self): + ip = get_ipython() + v = self.client[-1] + v.activate() + v.block=True + + with capture_output() as io: + ip.magic('autopx') + ip.run_cell('\n'.join(('a=5','b=12345','c=0'))) + ip.run_cell('b*=2') + ip.run_cell('print (b)') + ip.run_cell('b') + ip.run_cell("b/c") + ip.magic('autopx') + + output = io.stdout.strip() + + self.assertTrue(output.startswith('%autopx enabled'), output) + self.assertTrue(output.endswith('%autopx disabled'), output) + self.assertTrue('RemoteError: ZeroDivisionError' in output, output) + self.assertTrue('] Out[' in output, output) + self.assertTrue(': 24690' in output, output) + ar = v.get_result(-1) + self.assertEquals(v['a'], 5) + self.assertEquals(v['b'], 24690) + self.assertRaisesRemote(ZeroDivisionError, ar.get) + + def test_autopx_nonblocking(self): + ip = get_ipython() + v = self.client[-1] + v.activate() + v.block=False + + with capture_output() as io: + ip.magic('autopx') + ip.run_cell('\n'.join(('a=5','b=10','c=0'))) + ip.run_cell('print (b)') + ip.run_cell('import time; time.sleep(0.1)') + ip.run_cell("b/c") + ip.run_cell('b*=2') + ip.magic('autopx') + + output = io.stdout.strip() + + self.assertTrue(output.startswith('%autopx enabled')) + self.assertTrue(output.endswith('%autopx disabled')) + self.assertFalse('ZeroDivisionError' in output) + ar = v.get_result(-2) + self.assertRaisesRemote(ZeroDivisionError, ar.get) + # prevent TaskAborted on pulls, due to ZeroDivisionError + time.sleep(0.5) + self.assertEquals(v['a'], 5) + # b*=2 will not fire, due to abort + self.assertEquals(v['b'], 10) + + def test_result(self): + ip = get_ipython() + v = self.client[-1] + v.activate() + data = dict(a=111,b=222) + v.push(data, block=True) + + ip.magic('px a') + ip.magic('px b') + for idx, name in [ + ('', 'b'), + ('-1', 'b'), + ('2', 'b'), + ('1', 'a'), + ('-2', 'a'), + ]: + with capture_output() as io: + ip.magic('result ' + idx) + output = io.stdout.strip() + msg = "expected %s output to include %s, but got: %s" % \ + ('%result '+idx, str(data[name]), output) + self.assertTrue(str(data[name]) in output, msg) + + @dec.skipif_not_matplotlib + def test_px_pylab(self): + """%pylab works on engines""" + ip = get_ipython() + v = self.client[-1] + v.block = True + v.activate() + + with capture_output() as io: + ip.magic("px %pylab inline") + + self.assertTrue("Welcome to pylab" in io.stdout, io.stdout) + self.assertTrue("backend_inline" in io.stdout, io.stdout) + + with capture_output() as io: + ip.magic("px plot(rand(100))") + + self.assertTrue('] Out[' in io.stdout, io.stdout) + self.assertTrue('matplotlib.lines' in io.stdout, io.stdout) + + diff --git a/IPython/parallel/tests/test_view.py b/IPython/parallel/tests/test_view.py index d8cc803..e39b494 100644 --- a/IPython/parallel/tests/test_view.py +++ b/IPython/parallel/tests/test_view.py @@ -366,118 +366,6 @@ class TestView(ClusterTestCase, ParametricTestCase): self.assertEquals(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split()) - # parallel magic tests - - def test_magic_px_blocking(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v.block=True - - ip.magic('px a=5') - self.assertEquals(v['a'], 5) - ip.magic('px a=10') - self.assertEquals(v['a'], 10) - sio = StringIO() - savestdout = sys.stdout - sys.stdout = sio - # just 'print a' worst ~99% of the time, but this ensures that - # the stdout message has arrived when the result is finished: - ip.magic('px import sys,time;print (a); sys.stdout.flush();time.sleep(0.2)') - sys.stdout = savestdout - buf = sio.getvalue() - self.assertTrue('[stdout:' in buf, buf) - self.assertTrue(buf.rstrip().endswith('10')) - self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0') - - def test_magic_px_nonblocking(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v.block=False - - ip.magic('px a=5') - self.assertEquals(v['a'], 5) - ip.magic('px a=10') - self.assertEquals(v['a'], 10) - sio = StringIO() - savestdout = sys.stdout - sys.stdout = sio - ip.magic('px print a') - sys.stdout = savestdout - buf = sio.getvalue() - self.assertFalse('[stdout:%i]'%v.targets in buf) - ip.magic('px 1/0') - ar = v.get_result(-1) - self.assertRaisesRemote(ZeroDivisionError, ar.get) - - def test_magic_autopx_blocking(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v.block=True - - sio = StringIO() - savestdout = sys.stdout - sys.stdout = sio - ip.magic('autopx') - ip.run_cell('\n'.join(('a=5','b=10','c=0'))) - ip.run_cell('b*=2') - ip.run_cell('print (b)') - ip.run_cell("b/c") - ip.magic('autopx') - sys.stdout = savestdout - output = sio.getvalue().strip() - self.assertTrue(output.startswith('%autopx enabled')) - self.assertTrue(output.endswith('%autopx disabled')) - self.assertTrue('RemoteError: ZeroDivisionError' in output) - ar = v.get_result(-1) - self.assertEquals(v['a'], 5) - self.assertEquals(v['b'], 20) - self.assertRaisesRemote(ZeroDivisionError, ar.get) - - def test_magic_autopx_nonblocking(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v.block=False - - sio = StringIO() - savestdout = sys.stdout - sys.stdout = sio - ip.magic('autopx') - ip.run_cell('\n'.join(('a=5','b=10','c=0'))) - ip.run_cell('print (b)') - ip.run_cell('import time; time.sleep(0.1)') - ip.run_cell("b/c") - ip.run_cell('b*=2') - ip.magic('autopx') - sys.stdout = savestdout - output = sio.getvalue().strip() - self.assertTrue(output.startswith('%autopx enabled')) - self.assertTrue(output.endswith('%autopx disabled')) - self.assertFalse('ZeroDivisionError' in output) - ar = v.get_result(-2) - self.assertRaisesRemote(ZeroDivisionError, ar.get) - # prevent TaskAborted on pulls, due to ZeroDivisionError - time.sleep(0.5) - self.assertEquals(v['a'], 5) - # b*=2 will not fire, due to abort - self.assertEquals(v['b'], 10) - - def test_magic_result(self): - ip = get_ipython() - v = self.client[-1] - v.activate() - v['a'] = 111 - ra = v['a'] - - ar = ip.magic('result') - self.assertEquals(ar.msg_ids, [v.history[-1]]) - self.assertEquals(ar.get(), 111) - ar = ip.magic('result -2') - self.assertEquals(ar.msg_ids, [v.history[-2]]) - def test_unicode_execute(self): """test executing unicode strings""" v = self.client[-1] @@ -575,16 +463,6 @@ class TestView(ClusterTestCase, ParametricTestCase): # begin execute tests - def _wait_for(self, f, timeout=10): - tic = time.time() - while time.time() <= tic + timeout: - if f(): - return - time.sleep(0.1) - self.client.spin() - if not f(): - print "Warning: Awaited condition never arrived" - def test_execute_reply(self): e0 = self.client[self.client.ids[0]] diff --git a/IPython/zmq/displayhook.py b/IPython/zmq/displayhook.py index a3f76ab..a3d9178 100644 --- a/IPython/zmq/displayhook.py +++ b/IPython/zmq/displayhook.py @@ -31,12 +31,15 @@ class ZMQDisplayHook(object): def _encode_binary(format_dict): + encoded = format_dict.copy() pngdata = format_dict.get('image/png') - if pngdata is not None: - format_dict['image/png'] = encodestring(pngdata).decode('ascii') + if isinstance(pngdata, bytes): + encoded['image/png'] = encodestring(pngdata).decode('ascii') jpegdata = format_dict.get('image/jpeg') - if jpegdata is not None: - format_dict['image/jpeg'] = encodestring(jpegdata).decode('ascii') + if isinstance(jpegdata, bytes): + encoded['image/jpeg'] = encodestring(jpegdata).decode('ascii') + + return encoded class ZMQShellDisplayHook(DisplayHook): @@ -61,8 +64,7 @@ class ZMQShellDisplayHook(DisplayHook): self.msg['content']['execution_count'] = self.prompt_count def write_format_data(self, format_dict): - _encode_binary(format_dict) - self.msg['content']['data'] = format_dict + self.msg['content']['data'] = _encode_binary(format_dict) def finish_displayhook(self): """Finish up all displayhook activities.""" diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 2775d54..b20ef16 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -382,6 +382,8 @@ class Kernel(Configurable): # runlines. We'll need to clean up this logic later. if shell._reply_content is not None: reply_content.update(shell._reply_content) + e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='execute') + reply_content['engine_info'] = e_info # reset after use shell._reply_content = None diff --git a/IPython/zmq/zmqshell.py b/IPython/zmq/zmqshell.py index 7d9a7b7..0a0709f 100644 --- a/IPython/zmq/zmqshell.py +++ b/IPython/zmq/zmqshell.py @@ -77,8 +77,7 @@ class ZMQDisplayPublisher(DisplayPublisher): self._validate_data(source, data, metadata) content = {} content['source'] = source - _encode_binary(data) - content['data'] = data + content['data'] = _encode_binary(data) content['metadata'] = metadata self.session.send( self.pub_socket, u'display_data', json_clean(content), diff --git a/docs/examples/parallel/Parallel Magics.ipynb b/docs/examples/parallel/Parallel Magics.ipynb new file mode 100644 index 0000000..e49cd72 --- /dev/null +++ b/docs/examples/parallel/Parallel Magics.ipynb @@ -0,0 +1,228 @@ +{ + "metadata": { + "name": "Parallel Magics" + }, + "nbformat": 3, + "worksheets": [ + { + "cells": [ + { + "cell_type": "heading", + "level": 1, + "source": [ + "Using Parallel Magics" + ] + }, + { + "cell_type": "markdown", + "source": [ + "IPython has a few magics for working with your engines.", + "", + "This assumes you have started an IPython cluster, either with the notebook interface,", + "or the `ipcluster/controller/engine` commands." + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "from IPython import parallel", + "rc = parallel.Client()", + "dv = rc[:]", + "dv.block = True", + "dv" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "The parallel magics come from the `parallelmagics` IPython extension.", + "The magics are set to work with a particular View object,", + "so to activate them, you call the `activate()` method on a particular view:" + ] + }, + { + "cell_type": "code", + "collapsed": true, + "input": [ + "dv.activate()" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Now we can execute code remotely with `%px`:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%px a=5" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%px print a" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%px a" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "You don't have to wait for results:" + ] + }, + { + "cell_type": "code", + "collapsed": true, + "input": [ + "dv.block = False" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%px import time", + "%px time.sleep(5)", + "%px time.time()" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "But you will notice that this didn't output the result of the last command.", + "For this, we have `%result`, which displays the output of the latest request:" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%result" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "Remember, an IPython engine is IPython, so you can do magics remotely as well!" + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "dv.block = True", + "%px %pylab inline" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "`%%px` can also be used as a cell magic, for submitting whole blocks.", + "This one acceps `--block` and `--noblock` flags to specify", + "the blocking behavior, though the default is unchanged.", + "" + ] + }, + { + "cell_type": "code", + "collapsed": true, + "input": [ + "dv.scatter('id', dv.targets, flatten=True)", + "dv['stride'] = len(dv)" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%%px --noblock", + "x = linspace(0,pi,1000)", + "for n in range(id,12, stride):", + " print n", + " plt.plot(x,sin(n*x))", + "plt.title(\"Plot %i\" % id)" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%result" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "markdown", + "source": [ + "It also lets you choose some amount of the grouping of the outputs with `--group-outputs`:", + "", + "The choices are:", + "", + "* `engine` - all of an engine's output is collected together", + "* `type` - where stdout of each engine is grouped, etc. (the default)", + "* `order` - same as `type`, but individual displaypub outputs are interleaved.", + " That is, it will output the first plot from each engine, then the second from each,", + " etc." + ] + }, + { + "cell_type": "code", + "collapsed": false, + "input": [ + "%%px --group-outputs=engine", + "x = linspace(0,pi,1000)", + "for n in range(id,12, stride):", + " print n", + " plt.plot(x,sin(n*x))", + "plt.title(\"Plot %i\" % id)" + ], + "language": "python", + "outputs": [] + }, + { + "cell_type": "code", + "collapsed": true, + "input": [ + "" + ], + "language": "python", + "outputs": [] + } + ] + } + ] +} \ No newline at end of file diff --git a/docs/source/parallel/parallel_mpi.txt b/docs/source/parallel/parallel_mpi.txt index 5838010..de0e69a 100644 --- a/docs/source/parallel/parallel_mpi.txt +++ b/docs/source/parallel/parallel_mpi.txt @@ -120,21 +120,19 @@ using our :func:`psum` function: In [1]: from IPython.parallel import Client - In [2]: %load_ext parallel_magic - In [3]: c = Client(profile='mpi') In [4]: view = c[:] - In [5]: view.activate() + In [5]: view.activate() # enabe magics # run the contents of the file on each engine: In [6]: view.run('psum.py') - In [6]: px a = np.random.rand(100) + In [6]: %px a = np.random.rand(100) Parallel execution on engines: [0,1,2,3] - In [8]: px s = psum(a) + In [8]: %px s = psum(a) Parallel execution on engines: [0,1,2,3] In [9]: view['s'] diff --git a/docs/source/parallel/parallel_multiengine.txt b/docs/source/parallel/parallel_multiengine.txt index f51fe41..adb8771 100644 --- a/docs/source/parallel/parallel_multiengine.txt +++ b/docs/source/parallel/parallel_multiengine.txt @@ -389,11 +389,11 @@ Parallel magic commands ----------------------- We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) -that make it more pleasant to execute Python commands on the engines -interactively. These are simply shortcuts to :meth:`execute` and -:meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single -Python command on the engines specified by the :attr:`targets` attribute of the -:class:`DirectView` instance: +that make it a bit more pleasant to execute Python commands on the engines interactively. +These are simply shortcuts to :meth:`.DirectView.execute` +and :meth:`.AsyncResult.display_outputs` methods repsectively. +The ``%px`` magic executes a single Python command on the engines +specified by the :attr:`targets` attribute of the :class:`DirectView` instance: .. sourcecode:: ipython @@ -413,43 +413,127 @@ Python command on the engines specified by the :attr:`targets` attribute of the In [27]: %px a = numpy.random.rand(2,2) Parallel execution on engines: [0, 1, 2, 3] - In [28]: %px ev = numpy.linalg.eigvals(a) + In [28]: %px numpy.linalg.eigvals(a) Parallel execution on engines: [0, 1, 2, 3] + [0] Out[68]: array([ 0.77120707, -0.19448286]) + [1] Out[68]: array([ 1.10815921, 0.05110369]) + [2] Out[68]: array([ 0.74625527, -0.37475081]) + [3] Out[68]: array([ 0.72931905, 0.07159743]) + + In [29]: %px print 'hi' + Parallel execution on engine(s): [0, 1, 2, 3] + [stdout:0] hi + [stdout:1] hi + [stdout:2] hi + [stdout:3] hi + + +Since engines are IPython as well, you can even run magics remotely: + +.. sourcecode:: ipython + + In [28]: %px %pylab inline + Parallel execution on engine(s): [0, 1, 2, 3] + [stdout:0] + Welcome to pylab, a matplotlib-based Python environment... + For more information, type 'help(pylab)'. + [stdout:1] + Welcome to pylab, a matplotlib-based Python environment... + For more information, type 'help(pylab)'. + [stdout:2] + Welcome to pylab, a matplotlib-based Python environment... + For more information, type 'help(pylab)'. + [stdout:3] + Welcome to pylab, a matplotlib-based Python environment... + For more information, type 'help(pylab)'. + +And once in pylab mode with the inline backend, +you can make plots and they will be displayed in your frontend +if it suports the inline figures (e.g. notebook or qtconsole): + +.. sourcecode:: ipython + + In [40]: %px plot(rand(100)) + Parallel execution on engine(s): [0, 1, 2, 3] + + + + + [0] Out[79]: [] + [1] Out[79]: [] + [2] Out[79]: [] + [3] Out[79]: [] - In [28]: dv['ev'] - Out[28]: [ array([ 1.09522024, -0.09645227]), - ....: array([ 1.21435496, -0.35546712]), - ....: array([ 0.72180653, 0.07133042]), - ....: array([ 1.46384341, 1.04353244e-04]) - ....: ] -The ``%result`` magic gets the most recent result, or takes an argument -specifying the index of the result to be requested. It is simply a shortcut to the -:meth:`get_result` method: +``%%px`` Cell Magic +******************* + +`%%px` can also be used as a Cell Magic, which accepts ``--[no]block`` flags, +and a ``--group-outputs`` argument, which adjust how the outputs of multiple +engines are presented. + +.. seealso:: + + :meth:`.AsyncResult.display_outputs` for the grouping options. .. sourcecode:: ipython + + In [50]: %%px --block --group-outputs=engine + ....: import numpy as np + ....: A = np.random.random((2,2)) + ....: ev = numpy.linalg.eigvals(A) + ....: print ev + ....: ev.max() + ....: + Parallel execution on engine(s): [0, 1, 2, 3] + [stdout:0] [ 0.60640442 0.95919621] + [0] Out[73]: 0.9591962130899806 + [stdout:1] [ 0.38501813 1.29430871] + [1] Out[73]: 1.2943087091452372 + [stdout:2] [-0.85925141 0.9387692 ] + [2] Out[73]: 0.93876920456230284 + [stdout:3] [ 0.37998269 1.24218246] + [3] Out[73]: 1.2421824618493817 + +``%result`` Magic +***************** + +If you are using ``%px`` in non-blocking mode, you won't get output. +You can use ``%result`` to display the outputs of the latest command, +just as is done when ``%px`` is blocking: + +.. sourcecode:: ipython + + In [39]: dv.block = False - In [29]: dv.apply_async(lambda : ev) + In [40]: %px print 'hi' + Async parallel execution on engine(s): [0, 1, 2, 3] - In [30]: %result - Out[30]: [ [ 1.28167017 0.14197338], - ....: [-0.14093616 1.27877273], - ....: [-0.37023573 1.06779409], - ....: [ 0.83664764 -0.25602658] ] + In [41]: %result + [stdout:0] hi + [stdout:1] hi + [stdout:2] hi + [stdout:3] hi + +``%result`` simply calls :meth:`.AsyncResult.display_outputs` on the most recent request. +You can pass integers as indices if you want a result other than the latest, +e.g. ``%result -2``, or ``%result 0`` for the first. + + +``%autopx`` +*********** The ``%autopx`` magic switches to a mode where everything you type is executed -on the engines given by the :attr:`targets` attribute: +on the engines until you do ``%autopx`` again. .. sourcecode:: ipython - In [30]: dv.block=False + In [30]: dv.block=True In [31]: %autopx - Auto Parallel Enabled - Type %autopx to disable + %autopx enabled In [32]: max_evals = [] - In [33]: for i in range(100): ....: a = numpy.random.rand(10,10) @@ -457,22 +541,15 @@ on the engines given by the :attr:`targets` attribute: ....: evals = numpy.linalg.eigvals(a) ....: max_evals.append(evals[0].real) ....: - ....: - - - In [34]: %autopx - Auto Parallel Disabled - In [35]: dv.block=True - - In [36]: px ans= "Average max eigenvalue is: %f"%(sum(max_evals)/len(max_evals)) - Parallel execution on engines: [0, 1, 2, 3] + In [34]: print "Average max eigenvalue is: %f" % (sum(max_evals)/len(max_evals)) + [stdout:0] Average max eigenvalue is: 10.193101 + [stdout:1] Average max eigenvalue is: 10.064508 + [stdout:2] Average max eigenvalue is: 10.055724 + [stdout:3] Average max eigenvalue is: 10.086876 - In [37]: dv['ans'] - Out[37]: [ 'Average max eigenvalue is: 10.1387247332', - ....: 'Average max eigenvalue is: 10.2076902286', - ....: 'Average max eigenvalue is: 10.1891484655', - ....: 'Average max eigenvalue is: 10.1158837784',] + In [35]: %autopx + Auto Parallel Disabled Moving Python objects around