# -*- coding: utf-8 -*- """Test Parallel magics Authors: * Min RK """ #------------------------------------------------------------------------------- # Copyright (C) 2011 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- import re import sys import time import zmq from nose import SkipTest from IPython.testing import decorators as dec from IPython.utils.io import capture_output from IPython import parallel as pmod from IPython.parallel import error from IPython.parallel import AsyncResult from IPython.parallel.util import interactive from IPython.parallel.tests import add_engines from .clienttest import ClusterTestCase, generate_output def setup(): add_engines(3, total=True) class TestParallelMagics(ClusterTestCase): def test_px_blocking(self): ip = get_ipython() v = self.client[-1:] v.activate() v.block=True ip.magic('px a=5') self.assertEqual(v['a'], [5]) ip.magic('px a=10') self.assertEqual(v['a'], [10]) # just 'print a' works ~99% of the time, but this ensures that # the stdout message has arrived when the result is finished: with capture_output() as io: ip.magic( 'px import sys,time;print(a);sys.stdout.flush();time.sleep(0.2)' ) self.assertIn('[stdout:', io.stdout) self.assertNotIn('\n\n', io.stdout) assert io.stdout.rstrip().endswith('10') self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0') def _check_generated_stderr(self, stderr, n): expected = [ r'\[stderr:\d+\]', '^stderr$', '^stderr2$', ] * n self.assertNotIn('\n\n', stderr) lines = stderr.splitlines() self.assertEqual(len(lines), len(expected), stderr) for line,expect in zip(lines, expected): if isinstance(expect, str): expect = [expect] for ex in expect: assert re.search(ex, line) is not None, "Expected %r in %r" % (ex, line) def test_cellpx_block_args(self): """%%px --[no]block flags work""" ip = get_ipython() v = self.client[-1:] v.activate() v.block=False for block in (True, False): v.block = block ip.magic("pxconfig --verbose") with capture_output(display=False) as io: ip.run_cell_magic("px", "", "1") if block: assert io.stdout.startswith("Parallel"), io.stdout else: assert io.stdout.startswith("Async"), io.stdout with capture_output(display=False) as io: ip.run_cell_magic("px", "--block", "1") assert io.stdout.startswith("Parallel"), io.stdout with capture_output(display=False) as io: ip.run_cell_magic("px", "--noblock", "1") assert io.stdout.startswith("Async"), io.stdout def test_cellpx_groupby_engine(self): """%%px --group-outputs=engine""" ip = get_ipython() v = self.client[:] v.block = True v.activate() v['generate_output'] = generate_output with capture_output(display=False) as io: ip.run_cell_magic('px', '--group-outputs=engine', 'generate_output()') self.assertNotIn('\n\n', io.stdout) lines = io.stdout.splitlines() expected = [ r'\[stdout:\d+\]', 'stdout', 'stdout2', r'\[output:\d+\]', r'IPython\.core\.display\.HTML', r'IPython\.core\.display\.Math', r'Out\[\d+:\d+\]:.*IPython\.core\.display\.Math', ] * len(v) self.assertEqual(len(lines), len(expected), io.stdout) for line,expect in zip(lines, expected): if isinstance(expect, str): expect = [expect] for ex in expect: assert re.search(ex, line) is not None, "Expected %r in %r" % (ex, line) self._check_generated_stderr(io.stderr, len(v)) def test_cellpx_groupby_order(self): """%%px --group-outputs=order""" ip = get_ipython() v = self.client[:] v.block = True v.activate() v['generate_output'] = generate_output with capture_output(display=False) as io: ip.run_cell_magic('px', '--group-outputs=order', 'generate_output()') self.assertNotIn('\n\n', io.stdout) lines = io.stdout.splitlines() expected = [] expected.extend([ r'\[stdout:\d+\]', 'stdout', 'stdout2', ] * len(v)) expected.extend([ r'\[output:\d+\]', 'IPython.core.display.HTML', ] * len(v)) expected.extend([ r'\[output:\d+\]', 'IPython.core.display.Math', ] * len(v)) expected.extend([ r'Out\[\d+:\d+\]:.*IPython\.core\.display\.Math' ] * len(v)) self.assertEqual(len(lines), len(expected), io.stdout) for line,expect in zip(lines, expected): if isinstance(expect, str): expect = [expect] for ex in expect: assert re.search(ex, line) is not None, "Expected %r in %r" % (ex, line) self._check_generated_stderr(io.stderr, len(v)) def test_cellpx_groupby_type(self): """%%px --group-outputs=type""" ip = get_ipython() v = self.client[:] v.block = True v.activate() v['generate_output'] = generate_output with capture_output(display=False) as io: ip.run_cell_magic('px', '--group-outputs=type', 'generate_output()') self.assertNotIn('\n\n', io.stdout) lines = io.stdout.splitlines() expected = [] expected.extend([ r'\[stdout:\d+\]', 'stdout', 'stdout2', ] * len(v)) expected.extend([ r'\[output:\d+\]', r'IPython\.core\.display\.HTML', r'IPython\.core\.display\.Math', ] * len(v)) expected.extend([ (r'Out\[\d+:\d+\]', r'IPython\.core\.display\.Math') ] * len(v)) self.assertEqual(len(lines), len(expected), io.stdout) for line,expect in zip(lines, expected): if isinstance(expect, str): expect = [expect] for ex in expect: assert re.search(ex, line) is not None, "Expected %r in %r" % (ex, line) self._check_generated_stderr(io.stderr, len(v)) def test_px_nonblocking(self): ip = get_ipython() v = self.client[-1:] v.activate() v.block=False ip.magic('px a=5') self.assertEqual(v['a'], [5]) ip.magic('px a=10') self.assertEqual(v['a'], [10]) ip.magic('pxconfig --verbose') with capture_output() as io: ar = ip.magic('px print (a)') self.assertIsInstance(ar, AsyncResult) self.assertIn('Async', io.stdout) self.assertNotIn('[stdout:', io.stdout) self.assertNotIn('\n\n', io.stdout) ar = ip.magic('px 1/0') self.assertRaisesRemote(ZeroDivisionError, ar.get) def test_autopx_blocking(self): ip = get_ipython() v = self.client[-1] v.activate() v.block=True with capture_output(display=False) as io: ip.magic('autopx') ip.run_cell('\n'.join(('a=5','b=12345','c=0'))) ip.run_cell('b*=2') ip.run_cell('print (b)') ip.run_cell('b') ip.run_cell("b/c") ip.magic('autopx') output = io.stdout assert output.startswith('%autopx enabled'), output assert output.rstrip().endswith('%autopx disabled'), output self.assertIn('ZeroDivisionError', output) self.assertIn('\nOut[', output) self.assertIn(': 24690', output) ar = v.get_result(-1) self.assertEqual(v['a'], 5) self.assertEqual(v['b'], 24690) self.assertRaisesRemote(ZeroDivisionError, ar.get) def test_autopx_nonblocking(self): ip = get_ipython() v = self.client[-1] v.activate() v.block=False with capture_output() as io: ip.magic('autopx') ip.run_cell('\n'.join(('a=5','b=10','c=0'))) ip.run_cell('print (b)') ip.run_cell('import time; time.sleep(0.1)') ip.run_cell("b/c") ip.run_cell('b*=2') ip.magic('autopx') output = io.stdout.rstrip() assert output.startswith('%autopx enabled'), output assert output.endswith('%autopx disabled'), output self.assertNotIn('ZeroDivisionError', output) ar = v.get_result(-2) self.assertRaisesRemote(ZeroDivisionError, ar.get) # prevent TaskAborted on pulls, due to ZeroDivisionError time.sleep(0.5) self.assertEqual(v['a'], 5) # b*=2 will not fire, due to abort self.assertEqual(v['b'], 10) def test_result(self): ip = get_ipython() v = self.client[-1] v.activate() data = dict(a=111,b=222) v.push(data, block=True) for name in ('a', 'b'): ip.magic('px ' + name) with capture_output(display=False) as io: ip.magic('pxresult') self.assertIn(str(data[name]), io.stdout) @dec.skipif_not_matplotlib def test_px_pylab(self): """%pylab works on engines""" ip = get_ipython() v = self.client[-1] v.block = True v.activate() with capture_output() as io: ip.magic("px %pylab inline") self.assertIn("Populating the interactive namespace from numpy and matplotlib", io.stdout) with capture_output(display=False) as io: ip.magic("px plot(rand(100))") self.assertIn('Out[', io.stdout) self.assertIn('matplotlib.lines', io.stdout) def test_pxconfig(self): ip = get_ipython() rc = self.client v = rc.activate(-1, '_tst') self.assertEqual(v.targets, rc.ids[-1]) ip.magic("%pxconfig_tst -t :") self.assertEqual(v.targets, rc.ids) ip.magic("%pxconfig_tst -t ::2") self.assertEqual(v.targets, rc.ids[::2]) ip.magic("%pxconfig_tst -t 1::2") self.assertEqual(v.targets, rc.ids[1::2]) ip.magic("%pxconfig_tst -t 1") self.assertEqual(v.targets, 1) ip.magic("%pxconfig_tst --block") self.assertEqual(v.block, True) ip.magic("%pxconfig_tst --noblock") self.assertEqual(v.block, False) def test_cellpx_targets(self): """%%px --targets doesn't change defaults""" ip = get_ipython() rc = self.client view = rc.activate(rc.ids) self.assertEqual(view.targets, rc.ids) ip.magic('pxconfig --verbose') for cell in ("pass", "1/0"): with capture_output(display=False) as io: try: ip.run_cell_magic("px", "--targets all", cell) except pmod.RemoteError: pass self.assertIn('engine(s): all', io.stdout) self.assertEqual(view.targets, rc.ids) def test_cellpx_block(self): """%%px --block doesn't change default""" ip = get_ipython() rc = self.client view = rc.activate(rc.ids) view.block = False self.assertEqual(view.targets, rc.ids) ip.magic('pxconfig --verbose') for cell in ("pass", "1/0"): with capture_output(display=False) as io: try: ip.run_cell_magic("px", "--block", cell) except pmod.RemoteError: pass self.assertNotIn('Async', io.stdout) self.assertEqual(view.block, False)