##// END OF EJS Templates
add inter-engine communication example
add inter-engine communication example

File last commit:

r3650:5e3faa67
r3653:864a845d
Show More
test_client.py
186 lines | 6.4 KiB | text/x-python | PythonLexer
import time
from tempfile import mktemp
import nose.tools as nt
from IPython.zmq.parallel import client as clientmod
from IPython.zmq.parallel import error
from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
from IPython.zmq.parallel.view import LoadBalancedView, DirectView
from clienttest import ClusterTestCase, segfault, wait
class TestClient(ClusterTestCase):
def test_ids(self):
n = len(self.client.ids)
self.add_engines(3)
self.assertEquals(len(self.client.ids), n+3)
self.assertTrue
def test_segfault(self):
"""test graceful handling of engine death"""
self.add_engines(1)
eid = self.client.ids[-1]
ar = self.client.apply(segfault, block=False)
self.assertRaisesRemote(error.EngineError, ar.get)
eid = ar.engine_id
while eid in self.client.ids:
time.sleep(.01)
self.client.spin()
def test_view_indexing(self):
"""test index access for views"""
self.add_engines(2)
targets = self.client._build_targets('all')[-1]
v = self.client[:]
self.assertEquals(v.targets, targets)
t = self.client.ids[2]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
t = self.client.ids[2:4]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
v = self.client[::2]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[::2])
v = self.client[1::3]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[1::3])
v = self.client[:-3]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[:-3])
v = self.client[-1]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[-1])
nt.assert_raises(TypeError, lambda : self.client[None])
def test_view_cache(self):
"""test that multiple view requests return the same object"""
v = self.client[:2]
v2 =self.client[:2]
self.assertTrue(v is v2)
v = self.client.view()
v2 = self.client.view(balanced=True)
self.assertTrue(v is v2)
def test_targets(self):
"""test various valid targets arguments"""
build = self.client._build_targets
ids = self.client.ids
idents,targets = build(None)
self.assertEquals(ids, targets)
def test_clear(self):
"""test clear behavior"""
self.add_engines(2)
self.client.block=True
self.client.push(dict(a=5))
self.client.pull('a')
id0 = self.client.ids[-1]
self.client.clear(targets=id0)
self.client.pull('a', targets=self.client.ids[:-1])
self.assertRaisesRemote(NameError, self.client.pull, 'a')
self.client.clear()
for i in self.client.ids:
self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i)
def test_push_pull(self):
"""test pushing and pulling"""
data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
self.add_engines(2)
push = self.client.push
pull = self.client.pull
self.client.block=True
nengines = len(self.client)
push({'data':data}, targets=0)
d = pull('data', targets=0)
self.assertEquals(d, data)
push({'data':data})
d = pull('data')
self.assertEquals(d, nengines*[data])
ar = push({'data':data}, block=False)
self.assertTrue(isinstance(ar, AsyncResult))
r = ar.get()
ar = pull('data', block=False)
self.assertTrue(isinstance(ar, AsyncResult))
r = ar.get()
self.assertEquals(r, nengines*[data])
push(dict(a=10,b=20))
r = pull(('a','b'))
self.assertEquals(r, nengines*[[10,20]])
def test_push_pull_function(self):
"test pushing and pulling functions"
def testf(x):
return 2.0*x
self.add_engines(4)
self.client.block=True
push = self.client.push
pull = self.client.pull
execute = self.client.execute
push({'testf':testf}, targets=0)
r = pull('testf', targets=0)
self.assertEqual(r(1.0), testf(1.0))
execute('r = testf(10)', targets=0)
r = pull('r', targets=0)
self.assertEquals(r, testf(10))
ar = push({'testf':testf}, block=False)
ar.get()
ar = pull('testf', block=False)
rlist = ar.get()
for r in rlist:
self.assertEqual(r(1.0), testf(1.0))
execute("def g(x): return x*x", targets=0)
r = pull(('testf','g'),targets=0)
self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
def test_push_function_globals(self):
"""test that pushed functions have access to globals"""
def geta():
return a
self.add_engines(1)
v = self.client[-1]
v.block=True
v['f'] = geta
self.assertRaisesRemote(NameError, v.execute, 'b=f()')
v.execute('a=5')
v.execute('b=f()')
self.assertEquals(v['b'], 5)
def test_get_result(self):
"""test getting results from the Hub."""
c = clientmod.Client(profile='iptest')
t = self.client.ids[-1]
ar = c.apply(wait, (1,), block=False, targets=t)
time.sleep(.25)
ahr = self.client.get_result(ar.msg_ids)
self.assertTrue(isinstance(ahr, AsyncHubResult))
self.assertEquals(ahr.get(), ar.get())
ar2 = self.client.get_result(ar.msg_ids)
self.assertFalse(isinstance(ar2, AsyncHubResult))
def test_ids_list(self):
"""test client.ids"""
self.add_engines(2)
ids = self.client.ids
self.assertEquals(ids, self.client._ids)
self.assertFalse(ids is self.client._ids)
ids.remove(ids[-1])
self.assertNotEquals(ids, self.client._ids)
def test_arun_newline(self):
"""test that run appends newline to files"""
tmpfile = mktemp()
with open(tmpfile, 'w') as f:
f.write("""def g():
return 5
""")
v = self.client[-1]
v.run(tmpfile, block=True)
self.assertEquals(v.apply_sync_bound(lambda : g()), 5)