diff --git a/IPython/extensions/parallelmagic.py b/IPython/extensions/parallelmagic.py index 9d82663..31a927f 100644 --- a/IPython/extensions/parallelmagic.py +++ b/IPython/extensions/parallelmagic.py @@ -36,9 +36,7 @@ Usage import ast import re -import sys -from IPython.core.display import display from IPython.core.error import UsageError from IPython.core.magic import Magics, magics_class, line_magic from IPython.testing.skipdoctest import skip_doctest @@ -64,7 +62,7 @@ class ParallelMagics(Magics): @skip_doctest @line_magic def result(self, parameter_s=''): - """Print the result of command i on all engines.. + """Print the result of command i on all engines. To use this a :class:`DirectView` instance must be created and then activated by calling its :meth:`activate` method. @@ -97,7 +95,7 @@ class ParallelMagics(Magics): result = self.active_view.get_result(msg_ids) result.get() - self._display_result(result) + result.display_outputs() @skip_doctest @line_magic @@ -128,7 +126,7 @@ class ParallelMagics(Magics): result = self.active_view.execute(parameter_s, silent=False, block=False) if self.active_view.block: result.get() - self._display_result(result) + result.display_outputs() @skip_doctest @line_magic @@ -187,54 +185,6 @@ class ParallelMagics(Magics): self._autopx = False print "%autopx disabled" - def _display_result(self, result): - """Display the output of a parallel result. - """ - # flush iopub, just in case - rc = self.active_view.client - rc._flush_iopub(rc._iopub_socket) - - if result._single_result: - # single result - stdouts = [result.stdout.rstrip()] - stderrs = [result.stderr.rstrip()] - outputs = [result.outputs] - results = [result.get()] - else: - stdouts = [s.rstrip() for s in result.stdout] - stderrs = [s.rstrip() for s in result.stderr] - outputs = result.outputs - results = result.get() - - targets = self.active_view.targets - if isinstance(targets, int): - targets = [targets] - elif targets == 'all': - targets = self.active_view.client.ids - - # republish stdout: - if any(stdouts): - for eid,stdout in zip(targets, stdouts): - print '[stdout:%2i]' % eid, stdout - - # republish stderr: - if any(stderrs): - for eid,stderr in zip(targets, stderrs): - print >> sys.stderr, '[stderr:%2i]' % eid, stderr - - # republish displaypub output - for eid,e_outputs in zip(targets, outputs): - for output in e_outputs: - md = output['metadata'] or {} - md['engine'] = eid - self.shell.display_pub.publish(output['source'], output['data'], md) - - # finally, add pyout: - for eid,r in zip(targets, results): - if r.pyout: - display(r) - - def pxrun_cell(self, raw_cell, store_history=False, silent=False): """drop-in replacement for InteractiveShell.run_cell. @@ -295,7 +245,7 @@ class ParallelMagics(Magics): self.shell.showtraceback() return True else: - self._display_result(result) + result.display_outputs() return False diff --git a/IPython/parallel/client/asyncresult.py b/IPython/parallel/client/asyncresult.py index 5931b6f..ecffd6e 100644 --- a/IPython/parallel/client/asyncresult.py +++ b/IPython/parallel/client/asyncresult.py @@ -21,7 +21,7 @@ from datetime import datetime from zmq import MessageTracker -from IPython.core.display import clear_output +from IPython.core.display import clear_output, display from IPython.external.decorator import decorator from IPython.parallel import error @@ -377,6 +377,140 @@ class AsyncResult(object): sys.stdout.flush() print print "done" + + def _republish_displaypub(self, content, eid): + """republish individual displaypub content dicts""" + try: + ip = get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + md = content['metadata'] or {} + md['engine'] = eid + ip.display_pub.publish(content['source'], content['data'], md) + + + def _display_single_result(self): + + print self.stdout + print >> sys.stderr, self.stderr + + try: + get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + + for output in self.outputs: + self._republish_displaypub(output, self.engine_id) + + if self.pyout is not None: + display(self.get()) + + @check_ready + def display_outputs(self, groupby="type"): + """republish the outputs of the computation + + Parameters + ---------- + + groupby : str [default: type] + if 'type': + Group outputs by type (show all stdout, then all stderr, etc.): + + [stdout:1] foo + [stdout:2] foo + [stderr:1] bar + [stderr:2] bar + if 'engine': + Display outputs for each engine before moving on to the next: + + [stdout:1] foo + [stderr:1] bar + [stdout:2] foo + [stderr:2] bar + + if 'order': + Like 'type', but further collate individual displaypub + outputs. This is meant for cases of each command producing + several plots, and you would like to see all of the first + plots together, then all of the second plots, and so on. + """ + # flush iopub, just in case + self._client._flush_iopub(self._client._iopub_socket) + if self._single_result: + self._display_single_result() + return + + stdouts = [s.rstrip() for s in self.stdout] + stderrs = [s.rstrip() for s in self.stderr] + pyouts = [p for p in self.pyout] + output_lists = self.outputs + results = self.get() + + targets = self.engine_id + + if groupby == "engine": + for eid,stdout,stderr,outputs,r,pyout in zip( + targets, stdouts, stderrs, output_lists, results, pyouts + ): + if stdout: + print '[stdout:%2i]' % eid, stdout + if stderr: + print '[stderr:%2i]' % eid, stderr + + try: + get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + + for output in outputs: + self._republish_displaypub(output, eid) + + if pyout is not None: + display(r) + + elif groupby in ('type', 'order'): + # republish stdout: + if any(stdouts): + for eid,stdout in zip(targets, stdouts): + print '[stdout:%2i]' % eid, stdout + + # republish stderr: + if any(stderrs): + for eid,stderr in zip(targets, stderrs): + print >> sys.stderr, '[stderr:%2i]' % eid, stderr + + try: + get_ipython() + except NameError: + # displaypub is meaningless outside IPython + return + + if groupby == 'order': + output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists)) + N = max(len(outputs) for outputs in output_lists) + for i in range(N): + for eid in targets: + outputs = output_dict[eid] + if len(outputs) >= N: + self._republish_displaypub(outputs[i], eid) + else: + # republish displaypub output + for eid,outputs in zip(targets, output_lists): + for output in outputs: + self._republish_displaypub(output, eid) + + # finally, add pyout: + for eid,r,pyout in zip(targets, results, pyouts): + if pyout is not None: + display(r) + + else: + raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby) + + class AsyncMapResult(AsyncResult):