test_view.py
789 lines
| 26.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4034 | # -*- coding: utf-8 -*- | ||
MinRK
|
r4018 | """test View objects | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3664 | #------------------------------------------------------------------------------- | ||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r3736 | import sys | ||
Jonathan March
|
r7682 | import platform | ||
MinRK
|
r3664 | import time | ||
MinRK
|
r9714 | from collections import namedtuple | ||
MinRK
|
r3664 | from tempfile import mktemp | ||
MinRK
|
r3736 | from StringIO import StringIO | ||
MinRK
|
r3664 | |||
import zmq | ||||
MinRK
|
r4580 | from nose import SkipTest | ||
MinRK
|
r8200 | from nose.plugins.attrib import attr | ||
MinRK
|
r3664 | |||
MinRK
|
r5626 | from IPython.testing import decorators as dec | ||
MinRK
|
r6833 | from IPython.testing.ipunittest import ParametricTestCase | ||
MinRK
|
r8228 | from IPython.utils.io import capture_output | ||
MinRK
|
r5626 | |||
MinRK
|
r3666 | from IPython import parallel as pmod | ||
from IPython.parallel import error | ||||
MinRK
|
r3673 | from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult | ||
MinRK
|
r3873 | from IPython.parallel import DirectView | ||
MinRK
|
r3666 | from IPython.parallel.util import interactive | ||
MinRK
|
r3664 | |||
MinRK
|
r3666 | from IPython.parallel.tests import add_engines | ||
MinRK
|
r3664 | |||
MinRK
|
r3776 | from .clienttest import ClusterTestCase, crash, wait, skip_without | ||
MinRK
|
r3664 | |||
def setup(): | ||||
MinRK
|
r6162 | add_engines(3, total=True) | ||
MinRK
|
r3664 | |||
MinRK
|
r9714 | point = namedtuple("point", "x y") | ||
MinRK
|
r6833 | class TestView(ClusterTestCase, ParametricTestCase): | ||
MinRK
|
r3664 | |||
Jonathan March
|
r7682 | def setUp(self): | ||
# On Win XP, wait for resource cleanup, else parallel test group fails | ||||
if platform.system() == "Windows" and platform.win32_ver()[0] == "XP": | ||||
# 1 sec fails. 1.5 sec seems ok. Using 2 sec for margin of safety | ||||
time.sleep(2) | ||||
Jonathan March
|
r7686 | super(TestView, self).setUp() | ||
Jonathan March
|
r7682 | |||
MinRK
|
r8200 | @attr('crash') | ||
MinRK
|
r3783 | def test_z_crash_mux(self): | ||
MinRK
|
r3664 | """test graceful handling of engine death (direct)""" | ||
# self.add_engines(1) | ||||
eid = self.client.ids[-1] | ||||
MinRK
|
r3776 | ar = self.client[eid].apply_async(crash) | ||
MinRK
|
r4155 | self.assertRaisesRemote(error.EngineError, ar.get, 10) | ||
MinRK
|
r3664 | eid = ar.engine_id | ||
MinRK
|
r3776 | tic = time.time() | ||
while eid in self.client.ids and time.time()-tic < 5: | ||||
MinRK
|
r3664 | time.sleep(.01) | ||
self.client.spin() | ||||
MinRK
|
r3776 | self.assertFalse(eid in self.client.ids, "Engine should have died") | ||
MinRK
|
r3664 | |||
def test_push_pull(self): | ||||
"""test pushing and pulling""" | ||||
data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'}) | ||||
t = self.client.ids[-1] | ||||
v = self.client[t] | ||||
push = v.push | ||||
pull = v.pull | ||||
v.block=True | ||||
nengines = len(self.client) | ||||
push({'data':data}) | ||||
d = pull('data') | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(d, data) | ||
MinRK
|
r3664 | self.client[:].push({'data':data}) | ||
d = self.client[:].pull('data', block=True) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(d, nengines*[data]) | ||
MinRK
|
r3664 | ar = push({'data':data}, block=False) | ||
self.assertTrue(isinstance(ar, AsyncResult)) | ||||
r = ar.get() | ||||
ar = self.client[:].pull('data', block=False) | ||||
self.assertTrue(isinstance(ar, AsyncResult)) | ||||
r = ar.get() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, nengines*[data]) | ||
MinRK
|
r3664 | self.client[:].push(dict(a=10,b=20)) | ||
MinRK
|
r3701 | r = self.client[:].pull(('a','b'), block=True) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, nengines*[[10,20]]) | ||
MinRK
|
r3664 | |||
def test_push_pull_function(self): | ||||
"test pushing and pulling functions" | ||||
def testf(x): | ||||
return 2.0*x | ||||
t = self.client.ids[-1] | ||||
MinRK
|
r3701 | v = self.client[t] | ||
v.block=True | ||||
push = v.push | ||||
pull = v.pull | ||||
execute = v.execute | ||||
MinRK
|
r3664 | push({'testf':testf}) | ||
r = pull('testf') | ||||
self.assertEqual(r(1.0), testf(1.0)) | ||||
execute('r = testf(10)') | ||||
r = pull('r') | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, testf(10)) | ||
MinRK
|
r3664 | ar = self.client[:].push({'testf':testf}, block=False) | ||
ar.get() | ||||
ar = self.client[:].pull('testf', block=False) | ||||
rlist = ar.get() | ||||
for r in rlist: | ||||
self.assertEqual(r(1.0), testf(1.0)) | ||||
execute("def g(x): return x*x") | ||||
r = pull(('testf','g')) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual((r[0](10),r[1](10)), (testf(10), 100)) | ||
MinRK
|
r3664 | |||
def test_push_function_globals(self): | ||||
"""test that pushed functions have access to globals""" | ||||
@interactive | ||||
def geta(): | ||||
return a | ||||
# self.add_engines(1) | ||||
v = self.client[-1] | ||||
v.block=True | ||||
v['f'] = geta | ||||
self.assertRaisesRemote(NameError, v.execute, 'b=f()') | ||||
v.execute('a=5') | ||||
v.execute('b=f()') | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(v['b'], 5) | ||
MinRK
|
r3664 | |||
def test_push_function_defaults(self): | ||||
"""test that pushed functions preserve default args""" | ||||
def echo(a=10): | ||||
return a | ||||
v = self.client[-1] | ||||
v.block=True | ||||
v['f'] = echo | ||||
v.execute('b=f()') | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(v['b'], 10) | ||
MinRK
|
r3664 | |||
def test_get_result(self): | ||||
"""test getting results from the Hub.""" | ||||
MinRK
|
r3666 | c = pmod.Client(profile='iptest') | ||
MinRK
|
r3664 | # self.add_engines(1) | ||
t = c.ids[-1] | ||||
v = c[t] | ||||
v2 = self.client[t] | ||||
ar = v.apply_async(wait, 1) | ||||
# give the monitor time to notice the message | ||||
time.sleep(.25) | ||||
MinRK
|
r11416 | ahr = v2.get_result(ar.msg_ids[0]) | ||
MinRK
|
r3664 | self.assertTrue(isinstance(ahr, AsyncHubResult)) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(ahr.get(), ar.get()) | ||
MinRK
|
r11416 | ar2 = v2.get_result(ar.msg_ids[0]) | ||
MinRK
|
r3664 | self.assertFalse(isinstance(ar2, AsyncHubResult)) | ||
c.spin() | ||||
c.close() | ||||
def test_run_newline(self): | ||||
"""test that run appends newline to files""" | ||||
tmpfile = mktemp() | ||||
with open(tmpfile, 'w') as f: | ||||
f.write("""def g(): | ||||
return 5 | ||||
""") | ||||
v = self.client[-1] | ||||
v.run(tmpfile, block=True) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(v.apply_sync(lambda f: f(), pmod.Reference('g')), 5) | ||
MinRK
|
r3664 | |||
def test_apply_tracked(self): | ||||
"""test tracking for apply""" | ||||
# self.add_engines(1) | ||||
t = self.client.ids[-1] | ||||
v = self.client[t] | ||||
v.block=False | ||||
def echo(n=1024*1024, **kwargs): | ||||
with v.temp_flags(**kwargs): | ||||
return v.apply(lambda x: x, 'x'*n) | ||||
ar = echo(1, track=False) | ||||
self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | ||||
self.assertTrue(ar.sent) | ||||
ar = echo(track=True) | ||||
self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.sent, ar._tracker.done) | ||
MinRK
|
r3664 | ar._tracker.wait() | ||
self.assertTrue(ar.sent) | ||||
def test_push_tracked(self): | ||||
t = self.client.ids[-1] | ||||
ns = dict(x='x'*1024*1024) | ||||
v = self.client[t] | ||||
ar = v.push(ns, block=False, track=False) | ||||
self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | ||||
self.assertTrue(ar.sent) | ||||
ar = v.push(ns, block=False, track=True) | ||||
self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | ||||
ar._tracker.wait() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.sent, ar._tracker.done) | ||
MinRK
|
r3664 | self.assertTrue(ar.sent) | ||
ar.get() | ||||
def test_scatter_tracked(self): | ||||
t = self.client.ids | ||||
x='x'*1024*1024 | ||||
ar = self.client[t].scatter('x', x, block=False, track=False) | ||||
self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | ||||
self.assertTrue(ar.sent) | ||||
ar = self.client[t].scatter('x', x, block=False, track=True) | ||||
self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker)) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.sent, ar._tracker.done) | ||
MinRK
|
r3664 | ar._tracker.wait() | ||
self.assertTrue(ar.sent) | ||||
ar.get() | ||||
def test_remote_reference(self): | ||||
v = self.client[-1] | ||||
v['a'] = 123 | ||||
MinRK
|
r3666 | ra = pmod.Reference('a') | ||
MinRK
|
r3664 | b = v.apply_sync(lambda x: x, ra) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(b, 123) | ||
MinRK
|
r3664 | |||
def test_scatter_gather(self): | ||||
view = self.client[:] | ||||
seq1 = range(16) | ||||
view.scatter('a', seq1) | ||||
seq2 = view.gather('a', block=True) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(seq2, seq1) | ||
MinRK
|
r3664 | self.assertRaisesRemote(NameError, view.gather, 'asdf', block=True) | ||
@skip_without('numpy') | ||||
def test_scatter_gather_numpy(self): | ||||
import numpy | ||||
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal | ||||
view = self.client[:] | ||||
a = numpy.arange(64) | ||||
MinRK
|
r7967 | view.scatter('a', a, block=True) | ||
MinRK
|
r3664 | b = view.gather('a', block=True) | ||
assert_array_equal(b, a) | ||||
MinRK
|
r6753 | |||
def test_scatter_gather_lazy(self): | ||||
"""scatter/gather with targets='all'""" | ||||
view = self.client.direct_view(targets='all') | ||||
x = range(64) | ||||
view.scatter('x', x) | ||||
gathered = view.gather('x', block=True) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(gathered, x) | ||
MinRK
|
r6753 | |||
MinRK
|
r6244 | |||
Thomas Kluyver
|
r6726 | @dec.known_failure_py3 | ||
MinRK
|
r6244 | @skip_without('numpy') | ||
def test_push_numpy_nocopy(self): | ||||
import numpy | ||||
view = self.client[:] | ||||
a = numpy.arange(64) | ||||
view['A'] = a | ||||
@interactive | ||||
def check_writeable(x): | ||||
return x.flags.writeable | ||||
for flag in view.apply_sync(check_writeable, pmod.Reference('A')): | ||||
self.assertFalse(flag, "array is writeable, push shouldn't have pickled it") | ||||
view.push(dict(B=a)) | ||||
for flag in view.apply_sync(check_writeable, pmod.Reference('B')): | ||||
self.assertFalse(flag, "array is writeable, push shouldn't have pickled it") | ||||
MinRK
|
r3664 | |||
MinRK
|
r6187 | @skip_without('numpy') | ||
def test_apply_numpy(self): | ||||
"""view.apply(f, ndarray)""" | ||||
import numpy | ||||
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal | ||||
A = numpy.random.random((100,100)) | ||||
view = self.client[-1] | ||||
for dt in [ 'int32', 'uint8', 'float32', 'float64' ]: | ||||
B = A.astype(dt) | ||||
C = view.apply_sync(lambda x:x, B) | ||||
assert_array_equal(B,C) | ||||
MinRK
|
r7697 | @skip_without('numpy') | ||
def test_push_pull_recarray(self): | ||||
"""push/pull recarrays""" | ||||
import numpy | ||||
from numpy.testing.utils import assert_array_equal | ||||
view = self.client[-1] | ||||
R = numpy.array([ | ||||
(1, 'hi', 0.), | ||||
(2**30, 'there', 2.5), | ||||
(-99999, 'world', -12345.6789), | ||||
MinRK
|
r7699 | ], [('n', int), ('s', '|S10'), ('f', float)]) | ||
MinRK
|
r7697 | |||
view['RR'] = R | ||||
R2 = view['RR'] | ||||
r_dtype, r_shape = view.apply_sync(interactive(lambda : (RR.dtype, RR.shape))) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r_dtype, R.dtype) | ||
self.assertEqual(r_shape, R.shape) | ||||
self.assertEqual(R2.dtype, R.dtype) | ||||
self.assertEqual(R2.shape, R.shape) | ||||
MinRK
|
r7697 | assert_array_equal(R2, R) | ||
MinRK
|
r9136 | |||
@skip_without('pandas') | ||||
def test_push_pull_timeseries(self): | ||||
"""push/pull pandas.TimeSeries""" | ||||
import pandas | ||||
ts = pandas.TimeSeries(range(10)) | ||||
view = self.client[-1] | ||||
view.push(dict(ts=ts), block=True) | ||||
rts = view['ts'] | ||||
self.assertEqual(type(rts), type(ts)) | ||||
self.assertTrue((ts == rts).all()) | ||||
MinRK
|
r7697 | |||
MinRK
|
r3664 | def test_map(self): | ||
view = self.client[:] | ||||
def f(x): | ||||
return x**2 | ||||
data = range(16) | ||||
r = view.map_sync(f, data) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, map(f, data)) | ||
MinRK
|
r3664 | |||
MinRK
|
r5560 | def test_map_iterable(self): | ||
"""test map on iterables (direct)""" | ||||
view = self.client[:] | ||||
# 101 is prime, so it won't be evenly distributed | ||||
arr = range(101) | ||||
# ensure it will be an iterator, even in Python 3 | ||||
it = iter(arr) | ||||
r = view.map_sync(lambda x:x, arr) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, list(arr)) | ||
MinRK
|
r5560 | |||
MinRK
|
r7967 | def test_scatter_gather_nonblocking(self): | ||
MinRK
|
r3664 | data = range(16) | ||
view = self.client[:] | ||||
view.scatter('a', data, block=False) | ||||
ar = view.gather('a', block=False) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.get(), data) | ||
MinRK
|
r3664 | |||
@skip_without('numpy') | ||||
def test_scatter_gather_numpy_nonblocking(self): | ||||
import numpy | ||||
from numpy.testing.utils import assert_array_equal, assert_array_almost_equal | ||||
a = numpy.arange(64) | ||||
view = self.client[:] | ||||
ar = view.scatter('a', a, block=False) | ||||
self.assertTrue(isinstance(ar, AsyncResult)) | ||||
amr = view.gather('a', block=False) | ||||
self.assertTrue(isinstance(amr, AsyncMapResult)) | ||||
assert_array_equal(amr.get(), a) | ||||
def test_execute(self): | ||||
view = self.client[:] | ||||
# self.client.debug=True | ||||
execute = view.execute | ||||
ar = execute('c=30', block=False) | ||||
self.assertTrue(isinstance(ar, AsyncResult)) | ||||
ar = execute('d=[0,1,2]', block=False) | ||||
self.client.wait(ar, 1) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(ar.get()), len(self.client)) | ||
MinRK
|
r3664 | for c in view['c']: | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(c, 30) | ||
MinRK
|
r3664 | |||
def test_abort(self): | ||||
view = self.client[-1] | ||||
MinRK
|
r4580 | ar = view.execute('import time; time.sleep(1)', block=False) | ||
MinRK
|
r3664 | ar2 = view.apply_async(lambda : 2) | ||
ar3 = view.apply_async(lambda : 3) | ||||
view.abort(ar2) | ||||
view.abort(ar3.msg_ids) | ||||
self.assertRaises(error.TaskAborted, ar2.get) | ||||
self.assertRaises(error.TaskAborted, ar3.get) | ||||
MinRK
|
r5645 | |||
def test_abort_all(self): | ||||
"""view.abort() aborts all outstanding tasks""" | ||||
view = self.client[-1] | ||||
MinRK
|
r6162 | ars = [ view.apply_async(time.sleep, 0.25) for i in range(10) ] | ||
MinRK
|
r5645 | view.abort() | ||
view.wait(timeout=5) | ||||
for ar in ars[5:]: | ||||
self.assertRaises(error.TaskAborted, ar.get) | ||||
MinRK
|
r3664 | def test_temp_flags(self): | ||
view = self.client[-1] | ||||
view.block=True | ||||
with view.temp_flags(block=False): | ||||
self.assertFalse(view.block) | ||||
self.assertTrue(view.block) | ||||
MinRK
|
r5626 | @dec.known_failure_py3 | ||
MinRK
|
r3665 | def test_importer(self): | ||
view = self.client[-1] | ||||
view.clear(block=True) | ||||
with view.importer: | ||||
import re | ||||
@interactive | ||||
def findall(pat, s): | ||||
# this globals() step isn't necessary in real code | ||||
# only to prevent a closure in the test | ||||
MinRK
|
r3775 | re = globals()['re'] | ||
return re.findall(pat, s) | ||||
MinRK
|
r3665 | |||
Bradley M. Froehle
|
r7874 | self.assertEqual(view.apply_sync(findall, '\w+', 'hello world'), 'hello world'.split()) | ||
MinRK
|
r3665 | |||
MinRK
|
r3783 | def test_unicode_execute(self): | ||
"""test executing unicode strings""" | ||||
v = self.client[-1] | ||||
v.block=True | ||||
MinRK
|
r4155 | if sys.version_info[0] >= 3: | ||
code="a='é'" | ||||
else: | ||||
code=u"a=u'é'" | ||||
MinRK
|
r3783 | v.execute(code) | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(v['a'], u'é') | ||
MinRK
|
r3783 | |||
def test_unicode_apply_result(self): | ||||
"""test unicode apply results""" | ||||
v = self.client[-1] | ||||
r = v.apply_sync(lambda : u'é') | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, u'é') | ||
MinRK
|
r3783 | |||
def test_unicode_apply_arg(self): | ||||
"""test passing unicode arguments to apply""" | ||||
v = self.client[-1] | ||||
@interactive | ||||
def check_unicode(a, check): | ||||
assert isinstance(a, unicode), "%r is not unicode"%a | ||||
assert isinstance(check, bytes), "%r is not bytes"%check | ||||
assert a.encode('utf8') == check, "%s != %s"%(a,check) | ||||
MinRK
|
r4155 | for s in [ u'é', u'ßø®∫',u'asdf' ]: | ||
MinRK
|
r3783 | try: | ||
v.apply_sync(check_unicode, s, s.encode('utf8')) | ||||
except error.RemoteError as e: | ||||
if e.ename == 'AssertionError': | ||||
self.fail(e.evalue) | ||||
else: | ||||
raise e | ||||
MinRK
|
r5560 | |||
MinRK
|
r5821 | def test_map_reference(self): | ||
"""view.map(<Reference>, *seqs) should work""" | ||||
v = self.client[:] | ||||
v.scatter('n', self.client.ids, flatten=True) | ||||
v.execute("f = lambda x,y: x*y") | ||||
rf = pmod.Reference('f') | ||||
nlist = list(range(10)) | ||||
mlist = nlist[::-1] | ||||
expected = [ m*n for m,n in zip(mlist, nlist) ] | ||||
result = v.map_sync(rf, mlist, nlist) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(result, expected) | ||
MinRK
|
r5821 | |||
def test_apply_reference(self): | ||||
"""view.apply(<Reference>, *args) should work""" | ||||
v = self.client[:] | ||||
v.scatter('n', self.client.ids, flatten=True) | ||||
v.execute("f = lambda x: n*x") | ||||
rf = pmod.Reference('f') | ||||
result = v.apply_sync(rf, 5) | ||||
expected = [ 5*id for id in self.client.ids ] | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(result, expected) | ||
MinRK
|
r6159 | |||
def test_eval_reference(self): | ||||
v = self.client[self.client.ids[0]] | ||||
v['g'] = range(5) | ||||
rg = pmod.Reference('g[0]') | ||||
echo = lambda x:x | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(v.apply_sync(echo, rg), 0) | ||
MinRK
|
r6159 | |||
def test_reference_nameerror(self): | ||||
v = self.client[self.client.ids[0]] | ||||
r = pmod.Reference('elvis_has_left') | ||||
echo = lambda x:x | ||||
self.assertRaisesRemote(NameError, v.apply_sync, echo, r) | ||||
MinRK
|
r6513 | def test_single_engine_map(self): | ||
e0 = self.client[self.client.ids[0]] | ||||
r = range(5) | ||||
check = [ -1*i for i in r ] | ||||
result = e0.map_sync(lambda x: -1*x, r) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(result, check) | ||
MinRK
|
r6833 | |||
def test_len(self): | ||||
"""len(view) makes sense""" | ||||
e0 = self.client[self.client.ids[0]] | ||||
Bradley M. Froehle
|
r7874 | yield self.assertEqual(len(e0), 1) | ||
MinRK
|
r6833 | v = self.client[:] | ||
Bradley M. Froehle
|
r7874 | yield self.assertEqual(len(v), len(self.client.ids)) | ||
MinRK
|
r6833 | v = self.client.direct_view('all') | ||
Bradley M. Froehle
|
r7874 | yield self.assertEqual(len(v), len(self.client.ids)) | ||
MinRK
|
r6833 | v = self.client[:2] | ||
Bradley M. Froehle
|
r7874 | yield self.assertEqual(len(v), 2) | ||
MinRK
|
r6833 | v = self.client[:1] | ||
Bradley M. Froehle
|
r7874 | yield self.assertEqual(len(v), 1) | ||
MinRK
|
r6833 | v = self.client.load_balanced_view() | ||
Bradley M. Froehle
|
r7874 | yield self.assertEqual(len(v), len(self.client.ids)) | ||
MinRK
|
r6833 | # parametric tests seem to require manual closing? | ||
self.client.close() | ||||
MinRK
|
r3736 | |||
MinRK
|
r6835 | |||
# begin execute tests | ||||
def test_execute_reply(self): | ||||
e0 = self.client[self.client.ids[0]] | ||||
e0.block = True | ||||
ar = e0.execute("5", silent=False) | ||||
er = ar.get() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(str(er), "<ExecuteReply[%i]: 5>" % er.execution_count) | ||
self.assertEqual(er.pyout['data']['text/plain'], '5') | ||||
MinRK
|
r6835 | |||
def test_execute_reply_stdout(self): | ||||
e0 = self.client[self.client.ids[0]] | ||||
e0.block = True | ||||
ar = e0.execute("print (5)", silent=False) | ||||
er = ar.get() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(er.stdout.strip(), '5') | ||
MinRK
|
r6835 | |||
def test_execute_pyout(self): | ||||
"""execute triggers pyout with silent=False""" | ||||
view = self.client[:] | ||||
ar = view.execute("5", silent=False, block=True) | ||||
MinRK
|
r6890 | |||
MinRK
|
r6835 | expected = [{'text/plain' : '5'}] * len(view) | ||
MinRK
|
r6891 | mimes = [ out['data'] for out in ar.pyout ] | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(mimes, expected) | ||
MinRK
|
r6835 | |||
def test_execute_silent(self): | ||||
"""execute does not trigger pyout with silent=True""" | ||||
view = self.client[:] | ||||
ar = view.execute("5", block=True) | ||||
expected = [None] * len(view) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.pyout, expected) | ||
MinRK
|
r6835 | |||
def test_execute_magic(self): | ||||
"""execute accepts IPython commands""" | ||||
view = self.client[:] | ||||
view.execute("a = 5") | ||||
ar = view.execute("%whos", block=True) | ||||
# this will raise, if that failed | ||||
ar.get(5) | ||||
for stdout in ar.stdout: | ||||
lines = stdout.splitlines() | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(lines[0].split(), ['Variable', 'Type', 'Data/Info']) | ||
MinRK
|
r6835 | found = False | ||
for line in lines[2:]: | ||||
split = line.split() | ||||
if split == ['a', 'int', '5']: | ||||
found = True | ||||
break | ||||
self.assertTrue(found, "whos output wrong: %s" % stdout) | ||||
def test_execute_displaypub(self): | ||||
"""execute tracks display_pub output""" | ||||
view = self.client[:] | ||||
view.execute("from IPython.core.display import *") | ||||
ar = view.execute("[ display(i) for i in range(5) ]", block=True) | ||||
MinRK
|
r6890 | |||
MinRK
|
r6891 | expected = [ {u'text/plain' : unicode(j)} for j in range(5) ] | ||
for outputs in ar.outputs: | ||||
mimes = [ out['data'] for out in outputs ] | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(mimes, expected) | ||
MinRK
|
r6835 | |||
def test_apply_displaypub(self): | ||||
"""apply tracks display_pub output""" | ||||
view = self.client[:] | ||||
view.execute("from IPython.core.display import *") | ||||
@interactive | ||||
def publish(): | ||||
[ display(i) for i in range(5) ] | ||||
ar = view.apply_async(publish) | ||||
ar.get(5) | ||||
MinRK
|
r6891 | expected = [ {u'text/plain' : unicode(j)} for j in range(5) ] | ||
for outputs in ar.outputs: | ||||
mimes = [ out['data'] for out in outputs ] | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(mimes, expected) | ||
MinRK
|
r6835 | |||
def test_execute_raises(self): | ||||
"""exceptions in execute requests raise appropriately""" | ||||
view = self.client[-1] | ||||
ar = view.execute("1/0") | ||||
self.assertRaisesRemote(ZeroDivisionError, ar.get, 2) | ||||
MinRK
|
r6881 | |||
MinRK
|
r8228 | def test_remoteerror_render_exception(self): | ||
"""RemoteErrors get nice tracebacks""" | ||||
view = self.client[-1] | ||||
ar = view.execute("1/0") | ||||
ip = get_ipython() | ||||
ip.user_ns['ar'] = ar | ||||
with capture_output() as io: | ||||
ip.run_cell("ar.get(2)") | ||||
self.assertTrue('ZeroDivisionError' in io.stdout, io.stdout) | ||||
def test_compositeerror_render_exception(self): | ||||
"""CompositeErrors get nice tracebacks""" | ||||
view = self.client[:] | ||||
ar = view.execute("1/0") | ||||
ip = get_ipython() | ||||
ip.user_ns['ar'] = ar | ||||
MinRK
|
r9730 | |||
MinRK
|
r8228 | with capture_output() as io: | ||
ip.run_cell("ar.get(2)") | ||||
MinRK
|
r9730 | count = min(error.CompositeError.tb_limit, len(view)) | ||
self.assertEqual(io.stdout.count('ZeroDivisionError'), count * 2, io.stdout) | ||||
self.assertEqual(io.stdout.count('by zero'), count, io.stdout) | ||||
self.assertEqual(io.stdout.count(':execute'), count, io.stdout) | ||||
MinRK
|
r8228 | |||
MinRK
|
r9411 | def test_compositeerror_truncate(self): | ||
"""Truncate CompositeErrors with many exceptions""" | ||||
view = self.client[:] | ||||
msg_ids = [] | ||||
for i in range(10): | ||||
ar = view.execute("1/0") | ||||
msg_ids.extend(ar.msg_ids) | ||||
ar = self.client.get_result(msg_ids) | ||||
try: | ||||
ar.get() | ||||
MinRK
|
r9784 | except error.CompositeError as _e: | ||
e = _e | ||||
MinRK
|
r9411 | else: | ||
self.fail("Should have raised CompositeError") | ||||
lines = e.render_traceback() | ||||
with capture_output() as io: | ||||
e.print_traceback() | ||||
self.assertTrue("more exceptions" in lines[-1]) | ||||
count = e.tb_limit | ||||
self.assertEqual(io.stdout.count('ZeroDivisionError'), 2 * count, io.stdout) | ||||
self.assertEqual(io.stdout.count('by zero'), count, io.stdout) | ||||
self.assertEqual(io.stdout.count(':execute'), count, io.stdout) | ||||
MinRK
|
r6881 | @dec.skipif_not_matplotlib | ||
MinRK
|
r6891 | def test_magic_pylab(self): | ||
MinRK
|
r6881 | """%pylab works on engines""" | ||
view = self.client[-1] | ||||
ar = view.execute("%pylab inline") | ||||
# at least check if this raised: | ||||
reply = ar.get(5) | ||||
# include imports, in case user config | ||||
ar = view.execute("plot(rand(100))", silent=False) | ||||
reply = ar.get(5) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(len(reply.outputs), 1) | ||
MinRK
|
r6881 | output = reply.outputs[0] | ||
MinRK
|
r6891 | self.assertTrue("data" in output) | ||
data = output['data'] | ||||
self.assertTrue("image/png" in data) | ||||
MinRK
|
r8042 | |||
def test_func_default_func(self): | ||||
"""interactively defined function as apply func default""" | ||||
def foo(): | ||||
return 'foo' | ||||
def bar(f=foo): | ||||
return f() | ||||
MinRK
|
r6881 | |||
MinRK
|
r8042 | view = self.client[-1] | ||
ar = view.apply_async(bar) | ||||
r = ar.get(10) | ||||
MinRK
|
r8191 | self.assertEqual(r, 'foo') | ||
MinRK
|
r8106 | def test_data_pub_single(self): | ||
view = self.client[-1] | ||||
ar = view.execute('\n'.join([ | ||||
MinRK
|
r9372 | 'from IPython.kernel.zmq.datapub import publish_data', | ||
MinRK
|
r8106 | 'for i in range(5):', | ||
' publish_data(dict(i=i))' | ||||
]), block=False) | ||||
self.assertTrue(isinstance(ar.data, dict)) | ||||
ar.get(5) | ||||
self.assertEqual(ar.data, dict(i=4)) | ||||
def test_data_pub(self): | ||||
view = self.client[:] | ||||
ar = view.execute('\n'.join([ | ||||
MinRK
|
r9372 | 'from IPython.kernel.zmq.datapub import publish_data', | ||
MinRK
|
r8106 | 'for i in range(5):', | ||
' publish_data(dict(i=i))' | ||||
]), block=False) | ||||
self.assertTrue(all(isinstance(d, dict) for d in ar.data)) | ||||
ar.get(5) | ||||
self.assertEqual(ar.data, [dict(i=4)] * len(ar)) | ||||
MinRK
|
r8132 | |||
def test_can_list_arg(self): | ||||
"""args in lists are canned""" | ||||
view = self.client[-1] | ||||
view['a'] = 128 | ||||
rA = pmod.Reference('a') | ||||
ar = view.apply_async(lambda x: x, [rA]) | ||||
r = ar.get(5) | ||||
self.assertEqual(r, [128]) | ||||
def test_can_dict_arg(self): | ||||
"""args in dicts are canned""" | ||||
view = self.client[-1] | ||||
view['a'] = 128 | ||||
rA = pmod.Reference('a') | ||||
ar = view.apply_async(lambda x: x, dict(foo=rA)) | ||||
r = ar.get(5) | ||||
self.assertEqual(r, dict(foo=128)) | ||||
def test_can_list_kwarg(self): | ||||
"""kwargs in lists are canned""" | ||||
view = self.client[-1] | ||||
view['a'] = 128 | ||||
rA = pmod.Reference('a') | ||||
ar = view.apply_async(lambda x=5: x, x=[rA]) | ||||
r = ar.get(5) | ||||
self.assertEqual(r, [128]) | ||||
def test_can_dict_kwarg(self): | ||||
"""kwargs in dicts are canned""" | ||||
view = self.client[-1] | ||||
view['a'] = 128 | ||||
rA = pmod.Reference('a') | ||||
ar = view.apply_async(lambda x=5: x, dict(foo=rA)) | ||||
r = ar.get(5) | ||||
self.assertEqual(r, dict(foo=128)) | ||||
def test_map_ref(self): | ||||
"""view.map works with references""" | ||||
view = self.client[:] | ||||
ranks = sorted(self.client.ids) | ||||
view.scatter('rank', ranks, flatten=True) | ||||
rrank = pmod.Reference('rank') | ||||
amr = view.map_async(lambda x: x*2, [rrank] * len(view)) | ||||
drank = amr.get(5) | ||||
self.assertEqual(drank, [ r*2 for r in ranks ]) | ||||
Bradley M. Froehle
|
r8271 | def test_nested_getitem_setitem(self): | ||
"""get and set with view['a.b']""" | ||||
view = self.client[-1] | ||||
view.execute('\n'.join([ | ||||
'class A(object): pass', | ||||
'a = A()', | ||||
'a.b = 128', | ||||
]), block=True) | ||||
ra = pmod.Reference('a') | ||||
r = view.apply_sync(lambda x: x.b, ra) | ||||
self.assertEqual(r, 128) | ||||
self.assertEqual(view['a.b'], 128) | ||||
view['a.b'] = 0 | ||||
MinRK
|
r6835 | |||
Bradley M. Froehle
|
r8271 | r = view.apply_sync(lambda x: x.b, ra) | ||
self.assertEqual(r, 0) | ||||
self.assertEqual(view['a.b'], 0) | ||||
MinRK
|
r9714 | |||
def test_return_namedtuple(self): | ||||
def namedtuplify(x, y): | ||||
from IPython.parallel.tests.test_view import point | ||||
return point(x, y) | ||||
view = self.client[-1] | ||||
p = view.apply_sync(namedtuplify, 1, 2) | ||||
self.assertEqual(p.x, 1) | ||||
self.assertEqual(p.y, 2) | ||||
def test_apply_namedtuple(self): | ||||
def echoxy(p): | ||||
return p.y, p.x | ||||
view = self.client[-1] | ||||
tup = view.apply_sync(echoxy, point(1, 2)) | ||||
self.assertEqual(tup, (2,1)) | ||||