test_magics.py
342 lines
| 11.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r7055 | # -*- coding: utf-8 -*- | ||
"""Test Parallel magics | ||||
Authors: | ||||
* Min RK | ||||
""" | ||||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import sys | ||||
import time | ||||
import zmq | ||||
from nose import SkipTest | ||||
from IPython.testing import decorators as dec | ||||
from IPython.testing.ipunittest import ParametricTestCase | ||||
from IPython import parallel as pmod | ||||
from IPython.parallel import error | ||||
from IPython.parallel import AsyncResult | ||||
from IPython.parallel.util import interactive | ||||
from IPython.parallel.tests import add_engines | ||||
from .clienttest import ClusterTestCase, capture_output, generate_output | ||||
def setup(): | ||||
add_engines(3, total=True) | ||||
class TestParallelMagics(ClusterTestCase, ParametricTestCase): | ||||
def test_px_blocking(self): | ||||
ip = get_ipython() | ||||
v = self.client[-1:] | ||||
v.activate() | ||||
v.block=True | ||||
ip.magic('px a=5') | ||||
self.assertEquals(v['a'], [5]) | ||||
ip.magic('px a=10') | ||||
self.assertEquals(v['a'], [10]) | ||||
# just 'print a' works ~99% of the time, but this ensures that | ||||
# the stdout message has arrived when the result is finished: | ||||
with capture_output() as io: | ||||
ip.magic( | ||||
'px import sys,time;print(a);sys.stdout.flush();time.sleep(0.2)' | ||||
) | ||||
out = io.stdout | ||||
self.assertTrue('[stdout:' in out, out) | ||||
self.assertTrue(out.rstrip().endswith('10')) | ||||
self.assertRaisesRemote(ZeroDivisionError, ip.magic, 'px 1/0') | ||||
def test_cellpx_block_args(self): | ||||
"""%%px --[no]block flags work""" | ||||
ip = get_ipython() | ||||
v = self.client[-1:] | ||||
v.activate() | ||||
v.block=False | ||||
for block in (True, False): | ||||
v.block = block | ||||
with capture_output() as io: | ||||
ip.run_cell_magic("px", "", "1") | ||||
if block: | ||||
self.assertTrue(io.stdout.startswith("Parallel"), io.stdout) | ||||
else: | ||||
self.assertTrue(io.stdout.startswith("Async"), io.stdout) | ||||
with capture_output() as io: | ||||
ip.run_cell_magic("px", "--block", "1") | ||||
self.assertTrue(io.stdout.startswith("Parallel"), io.stdout) | ||||
with capture_output() as io: | ||||
ip.run_cell_magic("px", "--noblock", "1") | ||||
self.assertTrue(io.stdout.startswith("Async"), io.stdout) | ||||
def test_cellpx_groupby_engine(self): | ||||
"""%%px --group-outputs=engine""" | ||||
ip = get_ipython() | ||||
v = self.client[:] | ||||
v.block = True | ||||
v.activate() | ||||
v['generate_output'] = generate_output | ||||
with capture_output() as io: | ||||
ip.run_cell_magic('px', '--group-outputs=engine', 'generate_output()') | ||||
lines = io.stdout.strip().splitlines()[1:] | ||||
expected = [ | ||||
('[stdout:', '] stdout'), | ||||
'stdout2', | ||||
'IPython.core.display.HTML', | ||||
'IPython.core.display.Math', | ||||
('] Out[', 'IPython.core.display.Math') | ||||
] * len(v) | ||||
self.assertEquals(len(lines), len(expected), io.stdout) | ||||
for line,expect in zip(lines, expected): | ||||
if isinstance(expect, str): | ||||
expect = [expect] | ||||
for ex in expect: | ||||
self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) | ||||
expected = [ | ||||
('[stderr:', '] stderr'), | ||||
'stderr2', | ||||
] * len(v) | ||||
lines = io.stderr.strip().splitlines() | ||||
self.assertEquals(len(lines), len(expected), io.stderr) | ||||
for line,expect in zip(lines, expected): | ||||
if isinstance(expect, str): | ||||
expect = [expect] | ||||
for ex in expect: | ||||
self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) | ||||
def test_cellpx_groupby_order(self): | ||||
"""%%px --group-outputs=order""" | ||||
ip = get_ipython() | ||||
v = self.client[:] | ||||
v.block = True | ||||
v.activate() | ||||
v['generate_output'] = generate_output | ||||
with capture_output() as io: | ||||
ip.run_cell_magic('px', '--group-outputs=order', 'generate_output()') | ||||
lines = io.stdout.strip().splitlines()[1:] | ||||
expected = [] | ||||
expected.extend([ | ||||
('[stdout:', '] stdout'), | ||||
'stdout2', | ||||
] * len(v)) | ||||
expected.extend([ | ||||
'IPython.core.display.HTML', | ||||
] * len(v)) | ||||
expected.extend([ | ||||
'IPython.core.display.Math', | ||||
] * len(v)) | ||||
expected.extend([ | ||||
('] Out[', 'IPython.core.display.Math') | ||||
] * len(v)) | ||||
self.assertEquals(len(lines), len(expected), io.stdout) | ||||
for line,expect in zip(lines, expected): | ||||
if isinstance(expect, str): | ||||
expect = [expect] | ||||
for ex in expect: | ||||
self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) | ||||
expected = [ | ||||
('[stderr:', '] stderr'), | ||||
'stderr2', | ||||
] * len(v) | ||||
lines = io.stderr.strip().splitlines() | ||||
self.assertEquals(len(lines), len(expected), io.stderr) | ||||
for line,expect in zip(lines, expected): | ||||
if isinstance(expect, str): | ||||
expect = [expect] | ||||
for ex in expect: | ||||
self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) | ||||
def test_cellpx_groupby_atype(self): | ||||
"""%%px --group-outputs=type""" | ||||
ip = get_ipython() | ||||
v = self.client[:] | ||||
v.block = True | ||||
v.activate() | ||||
v['generate_output'] = generate_output | ||||
with capture_output() as io: | ||||
ip.run_cell_magic('px', '--group-outputs=type', 'generate_output()') | ||||
lines = io.stdout.strip().splitlines()[1:] | ||||
expected = [] | ||||
expected.extend([ | ||||
('[stdout:', '] stdout'), | ||||
'stdout2', | ||||
] * len(v)) | ||||
expected.extend([ | ||||
'IPython.core.display.HTML', | ||||
'IPython.core.display.Math', | ||||
] * len(v)) | ||||
expected.extend([ | ||||
('] Out[', 'IPython.core.display.Math') | ||||
] * len(v)) | ||||
self.assertEquals(len(lines), len(expected), io.stdout) | ||||
for line,expect in zip(lines, expected): | ||||
if isinstance(expect, str): | ||||
expect = [expect] | ||||
for ex in expect: | ||||
self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) | ||||
expected = [ | ||||
('[stderr:', '] stderr'), | ||||
'stderr2', | ||||
] * len(v) | ||||
lines = io.stderr.strip().splitlines() | ||||
self.assertEquals(len(lines), len(expected), io.stderr) | ||||
for line,expect in zip(lines, expected): | ||||
if isinstance(expect, str): | ||||
expect = [expect] | ||||
for ex in expect: | ||||
self.assertTrue(ex in line, "Expected %r in %r" % (ex, line)) | ||||
def test_px_nonblocking(self): | ||||
ip = get_ipython() | ||||
v = self.client[-1:] | ||||
v.activate() | ||||
v.block=False | ||||
ip.magic('px a=5') | ||||
self.assertEquals(v['a'], [5]) | ||||
ip.magic('px a=10') | ||||
self.assertEquals(v['a'], [10]) | ||||
with capture_output() as io: | ||||
MinRK
|
r7058 | ar = ip.magic('px print (a)') | ||
self.assertTrue(isinstance(ar, AsyncResult)) | ||||
self.assertTrue('Async' in io.stdout) | ||||
MinRK
|
r7055 | self.assertFalse('[stdout:' in io.stdout) | ||
ar = ip.magic('px 1/0') | ||||
self.assertRaisesRemote(ZeroDivisionError, ar.get) | ||||
def test_autopx_blocking(self): | ||||
ip = get_ipython() | ||||
v = self.client[-1] | ||||
v.activate() | ||||
v.block=True | ||||
with capture_output() as io: | ||||
ip.magic('autopx') | ||||
ip.run_cell('\n'.join(('a=5','b=12345','c=0'))) | ||||
ip.run_cell('b*=2') | ||||
ip.run_cell('print (b)') | ||||
ip.run_cell('b') | ||||
ip.run_cell("b/c") | ||||
ip.magic('autopx') | ||||
output = io.stdout.strip() | ||||
self.assertTrue(output.startswith('%autopx enabled'), output) | ||||
self.assertTrue(output.endswith('%autopx disabled'), output) | ||||
self.assertTrue('RemoteError: ZeroDivisionError' in output, output) | ||||
self.assertTrue('] Out[' in output, output) | ||||
self.assertTrue(': 24690' in output, output) | ||||
ar = v.get_result(-1) | ||||
self.assertEquals(v['a'], 5) | ||||
self.assertEquals(v['b'], 24690) | ||||
self.assertRaisesRemote(ZeroDivisionError, ar.get) | ||||
def test_autopx_nonblocking(self): | ||||
ip = get_ipython() | ||||
v = self.client[-1] | ||||
v.activate() | ||||
v.block=False | ||||
with capture_output() as io: | ||||
ip.magic('autopx') | ||||
ip.run_cell('\n'.join(('a=5','b=10','c=0'))) | ||||
ip.run_cell('print (b)') | ||||
ip.run_cell('import time; time.sleep(0.1)') | ||||
ip.run_cell("b/c") | ||||
ip.run_cell('b*=2') | ||||
ip.magic('autopx') | ||||
output = io.stdout.strip() | ||||
self.assertTrue(output.startswith('%autopx enabled')) | ||||
self.assertTrue(output.endswith('%autopx disabled')) | ||||
self.assertFalse('ZeroDivisionError' in output) | ||||
ar = v.get_result(-2) | ||||
self.assertRaisesRemote(ZeroDivisionError, ar.get) | ||||
# prevent TaskAborted on pulls, due to ZeroDivisionError | ||||
time.sleep(0.5) | ||||
self.assertEquals(v['a'], 5) | ||||
# b*=2 will not fire, due to abort | ||||
self.assertEquals(v['b'], 10) | ||||
def test_result(self): | ||||
ip = get_ipython() | ||||
v = self.client[-1] | ||||
v.activate() | ||||
data = dict(a=111,b=222) | ||||
v.push(data, block=True) | ||||
ip.magic('px a') | ||||
ip.magic('px b') | ||||
for idx, name in [ | ||||
('', 'b'), | ||||
('-1', 'b'), | ||||
('2', 'b'), | ||||
('1', 'a'), | ||||
('-2', 'a'), | ||||
]: | ||||
with capture_output() as io: | ||||
ip.magic('result ' + idx) | ||||
output = io.stdout.strip() | ||||
msg = "expected %s output to include %s, but got: %s" % \ | ||||
('%result '+idx, str(data[name]), output) | ||||
self.assertTrue(str(data[name]) in output, msg) | ||||
@dec.skipif_not_matplotlib | ||||
def test_px_pylab(self): | ||||
"""%pylab works on engines""" | ||||
ip = get_ipython() | ||||
v = self.client[-1] | ||||
v.block = True | ||||
v.activate() | ||||
with capture_output() as io: | ||||
ip.magic("px %pylab inline") | ||||
self.assertTrue("Welcome to pylab" in io.stdout, io.stdout) | ||||
self.assertTrue("backend_inline" in io.stdout, io.stdout) | ||||
with capture_output() as io: | ||||
ip.magic("px plot(rand(100))") | ||||
self.assertTrue('] Out[' in io.stdout, io.stdout) | ||||
self.assertTrue('matplotlib.lines' in io.stdout, io.stdout) | ||||