##// END OF EJS Templates
some initial tests for newparallel
some initial tests for newparallel

File last commit:

r3637:e0abb035
r3637:e0abb035
Show More
test_client.py
116 lines | 3.7 KiB | text/x-python | PythonLexer
import time
import nose.tools as nt
from IPython.zmq.parallel.asyncresult import AsyncResult
from IPython.zmq.parallel.view import LoadBalancedView, DirectView
from clienttest import ClusterTestCase, segfault
class TestClient(ClusterTestCase):
def test_ids(self):
self.assertEquals(len(self.client.ids), 1)
self.add_engines(3)
self.assertEquals(len(self.client.ids), 4)
def test_segfault(self):
self.add_engines(1)
eid = self.client.ids[-1]
self.client[eid].apply(segfault)
while eid in self.client.ids:
time.sleep(.01)
self.client.spin()
def test_view_indexing(self):
self.add_engines(4)
targets = self.client._build_targets('all')[-1]
v = self.client[:]
self.assertEquals(v.targets, targets)
t = self.client.ids[2]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
t = self.client.ids[2:4]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
v = self.client[::2]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[::2])
v = self.client[1::3]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[1::3])
v = self.client[:-3]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[:-3])
nt.assert_raises(TypeError, lambda : self.client[None])
def test_view_cache(self):
"""test blocking and non-blocking behavior"""
v = self.client[:2]
v2 =self.client[:2]
self.assertTrue(v is v2)
v = self.client.view()
v2 = self.client.view(balanced=True)
self.assertTrue(v is v2)
def test_targets(self):
"""test various valid targets arguments"""
pass
def test_clear(self):
"""test clear behavior"""
# self.add_engines(4)
# self.client.push()
def test_push_pull(self):
data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
self.add_engines(4)
push = self.client.push
pull = self.client.pull
self.client.block=True
nengines = len(self.client)
push({'data':data}, targets=0)
d = pull('data', targets=0)
self.assertEquals(d, data)
push({'data':data})
d = pull('data')
self.assertEquals(d, nengines*[data])
ar = push({'data':data}, block=False)
self.assertTrue(isinstance(ar, AsyncResult))
r = ar.get()
ar = pull('data', block=False)
self.assertTrue(isinstance(ar, AsyncResult))
r = ar.get()
self.assertEquals(r, nengines*[data])
push(dict(a=10,b=20))
r = pull(('a','b'))
self.assertEquals(r, nengines*[[10,20]])
def test_push_pull_function(self):
def testf(x):
return 2.0*x
self.add_engines(4)
self.client.block=True
push = self.client.push
pull = self.client.pull
execute = self.client.execute
push({'testf':testf}, targets=0)
r = pull('testf', targets=0)
self.assertEqual(r(1.0), testf(1.0))
execute('r = testf(10)', targets=0)
r = pull('r', targets=0)
self.assertEquals(r, testf(10))
ar = push({'testf':testf}, block=False)
ar.get()
ar = pull('testf', block=False)
rlist = ar.get()
for r in rlist:
self.assertEqual(r(1.0), testf(1.0))
execute("def g(x): return x*x", targets=0)
r = pull(('testf','g'),targets=0)
self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))