##// END OF EJS Templates
add inter-engine communication example
add inter-engine communication example

File last commit:

r3650:5e3faa67
r3653:864a845d
Show More
test_client.py
186 lines | 6.4 KiB | text/x-python | PythonLexer
MinRK
added preliminary tests for zmq.parallel
r3595 import time
MinRK
fix small client bugs + tests
r3650 from tempfile import mktemp
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
some initial tests for newparallel
r3637 import nose.tools as nt
MinRK
testing fixes
r3641 from IPython.zmq.parallel import client as clientmod
from IPython.zmq.parallel import error
from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
MinRK
added preliminary tests for zmq.parallel
r3595 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
MinRK
testing fixes
r3641 from clienttest import ClusterTestCase, segfault, wait
MinRK
added preliminary tests for zmq.parallel
r3595
class TestClient(ClusterTestCase):
def test_ids(self):
MinRK
testing fixes
r3641 n = len(self.client.ids)
MinRK
added preliminary tests for zmq.parallel
r3595 self.add_engines(3)
MinRK
testing fixes
r3641 self.assertEquals(len(self.client.ids), n+3)
self.assertTrue
MinRK
added preliminary tests for zmq.parallel
r3595
def test_segfault(self):
MinRK
testing fixes
r3641 """test graceful handling of engine death"""
MinRK
some initial tests for newparallel
r3637 self.add_engines(1)
eid = self.client.ids[-1]
MinRK
testing fixes
r3641 ar = self.client.apply(segfault, block=False)
self.assertRaisesRemote(error.EngineError, ar.get)
eid = ar.engine_id
MinRK
some initial tests for newparallel
r3637 while eid in self.client.ids:
MinRK
added preliminary tests for zmq.parallel
r3595 time.sleep(.01)
self.client.spin()
def test_view_indexing(self):
MinRK
testing fixes
r3641 """test index access for views"""
self.add_engines(2)
MinRK
added preliminary tests for zmq.parallel
r3595 targets = self.client._build_targets('all')[-1]
v = self.client[:]
self.assertEquals(v.targets, targets)
MinRK
some initial tests for newparallel
r3637 t = self.client.ids[2]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
t = self.client.ids[2:4]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
v = self.client[::2]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[::2])
MinRK
some initial tests for newparallel
r3637 v = self.client[1::3]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[1::3])
MinRK
some initial tests for newparallel
r3637 v = self.client[:-3]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[:-3])
MinRK
fix/test pushed function globals
r3638 v = self.client[-1]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[-1])
MinRK
some initial tests for newparallel
r3637 nt.assert_raises(TypeError, lambda : self.client[None])
def test_view_cache(self):
MinRK
fix/test pushed function globals
r3638 """test that multiple view requests return the same object"""
MinRK
some initial tests for newparallel
r3637 v = self.client[:2]
v2 =self.client[:2]
self.assertTrue(v is v2)
v = self.client.view()
v2 = self.client.view(balanced=True)
self.assertTrue(v is v2)
def test_targets(self):
"""test various valid targets arguments"""
MinRK
testing fixes
r3641 build = self.client._build_targets
ids = self.client.ids
idents,targets = build(None)
self.assertEquals(ids, targets)
MinRK
some initial tests for newparallel
r3637
def test_clear(self):
"""test clear behavior"""
MinRK
testing fixes
r3641 self.add_engines(2)
self.client.block=True
self.client.push(dict(a=5))
self.client.pull('a')
id0 = self.client.ids[-1]
self.client.clear(targets=id0)
self.client.pull('a', targets=self.client.ids[:-1])
self.assertRaisesRemote(NameError, self.client.pull, 'a')
self.client.clear()
for i in self.client.ids:
self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i)
MinRK
some initial tests for newparallel
r3637
def test_push_pull(self):
MinRK
fix/test pushed function globals
r3638 """test pushing and pulling"""
MinRK
some initial tests for newparallel
r3637 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
MinRK
testing fixes
r3641 self.add_engines(2)
MinRK
some initial tests for newparallel
r3637 push = self.client.push
pull = self.client.pull
self.client.block=True
nengines = len(self.client)
push({'data':data}, targets=0)
d = pull('data', targets=0)
self.assertEquals(d, data)
push({'data':data})
d = pull('data')
self.assertEquals(d, nengines*[data])
ar = push({'data':data}, block=False)
self.assertTrue(isinstance(ar, AsyncResult))
r = ar.get()
ar = pull('data', block=False)
self.assertTrue(isinstance(ar, AsyncResult))
r = ar.get()
self.assertEquals(r, nengines*[data])
push(dict(a=10,b=20))
r = pull(('a','b'))
self.assertEquals(r, nengines*[[10,20]])
def test_push_pull_function(self):
MinRK
fix/test pushed function globals
r3638 "test pushing and pulling functions"
MinRK
some initial tests for newparallel
r3637 def testf(x):
return 2.0*x
self.add_engines(4)
self.client.block=True
push = self.client.push
pull = self.client.pull
execute = self.client.execute
push({'testf':testf}, targets=0)
r = pull('testf', targets=0)
self.assertEqual(r(1.0), testf(1.0))
execute('r = testf(10)', targets=0)
r = pull('r', targets=0)
self.assertEquals(r, testf(10))
ar = push({'testf':testf}, block=False)
ar.get()
ar = pull('testf', block=False)
rlist = ar.get()
for r in rlist:
self.assertEqual(r(1.0), testf(1.0))
execute("def g(x): return x*x", targets=0)
r = pull(('testf','g'),targets=0)
self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
MinRK
fix/test pushed function globals
r3638
def test_push_function_globals(self):
"""test that pushed functions have access to globals"""
def geta():
return a
self.add_engines(1)
v = self.client[-1]
v.block=True
v['f'] = geta
self.assertRaisesRemote(NameError, v.execute, 'b=f()')
v.execute('a=5')
v.execute('b=f()')
self.assertEquals(v['b'], 5)
MinRK
some initial tests for newparallel
r3637
MinRK
testing fixes
r3641 def test_get_result(self):
"""test getting results from the Hub."""
c = clientmod.Client(profile='iptest')
t = self.client.ids[-1]
ar = c.apply(wait, (1,), block=False, targets=t)
time.sleep(.25)
ahr = self.client.get_result(ar.msg_ids)
self.assertTrue(isinstance(ahr, AsyncHubResult))
self.assertEquals(ahr.get(), ar.get())
ar2 = self.client.get_result(ar.msg_ids)
self.assertFalse(isinstance(ar2, AsyncHubResult))
MinRK
fix small client bugs + tests
r3650
def test_ids_list(self):
"""test client.ids"""
self.add_engines(2)
ids = self.client.ids
self.assertEquals(ids, self.client._ids)
self.assertFalse(ids is self.client._ids)
ids.remove(ids[-1])
self.assertNotEquals(ids, self.client._ids)
def test_arun_newline(self):
"""test that run appends newline to files"""
tmpfile = mktemp()
with open(tmpfile, 'w') as f:
f.write("""def g():
return 5
""")
v = self.client[-1]
v.run(tmpfile, block=True)
self.assertEquals(v.apply_sync_bound(lambda : g()), 5)
MinRK
added preliminary tests for zmq.parallel
r3595