|
|
import time
|
|
|
|
|
|
import nose.tools as nt
|
|
|
|
|
|
from IPython.zmq.parallel import client as clientmod
|
|
|
from IPython.zmq.parallel import error
|
|
|
from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
|
|
|
from IPython.zmq.parallel.view import LoadBalancedView, DirectView
|
|
|
|
|
|
from clienttest import ClusterTestCase, segfault, wait
|
|
|
|
|
|
class TestClient(ClusterTestCase):
|
|
|
|
|
|
def test_ids(self):
|
|
|
n = len(self.client.ids)
|
|
|
self.add_engines(3)
|
|
|
self.assertEquals(len(self.client.ids), n+3)
|
|
|
self.assertTrue
|
|
|
|
|
|
def test_segfault(self):
|
|
|
"""test graceful handling of engine death"""
|
|
|
self.add_engines(1)
|
|
|
eid = self.client.ids[-1]
|
|
|
ar = self.client.apply(segfault, block=False)
|
|
|
self.assertRaisesRemote(error.EngineError, ar.get)
|
|
|
eid = ar.engine_id
|
|
|
while eid in self.client.ids:
|
|
|
time.sleep(.01)
|
|
|
self.client.spin()
|
|
|
|
|
|
def test_view_indexing(self):
|
|
|
"""test index access for views"""
|
|
|
self.add_engines(2)
|
|
|
targets = self.client._build_targets('all')[-1]
|
|
|
v = self.client[:]
|
|
|
self.assertEquals(v.targets, targets)
|
|
|
t = self.client.ids[2]
|
|
|
v = self.client[t]
|
|
|
self.assert_(isinstance(v, DirectView))
|
|
|
self.assertEquals(v.targets, t)
|
|
|
t = self.client.ids[2:4]
|
|
|
v = self.client[t]
|
|
|
self.assert_(isinstance(v, DirectView))
|
|
|
self.assertEquals(v.targets, t)
|
|
|
v = self.client[::2]
|
|
|
self.assert_(isinstance(v, DirectView))
|
|
|
self.assertEquals(v.targets, targets[::2])
|
|
|
v = self.client[1::3]
|
|
|
self.assert_(isinstance(v, DirectView))
|
|
|
self.assertEquals(v.targets, targets[1::3])
|
|
|
v = self.client[:-3]
|
|
|
self.assert_(isinstance(v, DirectView))
|
|
|
self.assertEquals(v.targets, targets[:-3])
|
|
|
v = self.client[-1]
|
|
|
self.assert_(isinstance(v, DirectView))
|
|
|
self.assertEquals(v.targets, targets[-1])
|
|
|
nt.assert_raises(TypeError, lambda : self.client[None])
|
|
|
|
|
|
def test_view_cache(self):
|
|
|
"""test that multiple view requests return the same object"""
|
|
|
v = self.client[:2]
|
|
|
v2 =self.client[:2]
|
|
|
self.assertTrue(v is v2)
|
|
|
v = self.client.view()
|
|
|
v2 = self.client.view(balanced=True)
|
|
|
self.assertTrue(v is v2)
|
|
|
|
|
|
def test_targets(self):
|
|
|
"""test various valid targets arguments"""
|
|
|
build = self.client._build_targets
|
|
|
ids = self.client.ids
|
|
|
idents,targets = build(None)
|
|
|
self.assertEquals(ids, targets)
|
|
|
|
|
|
def test_clear(self):
|
|
|
"""test clear behavior"""
|
|
|
self.add_engines(2)
|
|
|
self.client.block=True
|
|
|
self.client.push(dict(a=5))
|
|
|
self.client.pull('a')
|
|
|
id0 = self.client.ids[-1]
|
|
|
self.client.clear(targets=id0)
|
|
|
self.client.pull('a', targets=self.client.ids[:-1])
|
|
|
self.assertRaisesRemote(NameError, self.client.pull, 'a')
|
|
|
self.client.clear()
|
|
|
for i in self.client.ids:
|
|
|
self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i)
|
|
|
|
|
|
|
|
|
def test_push_pull(self):
|
|
|
"""test pushing and pulling"""
|
|
|
data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
|
|
|
self.add_engines(2)
|
|
|
push = self.client.push
|
|
|
pull = self.client.pull
|
|
|
self.client.block=True
|
|
|
nengines = len(self.client)
|
|
|
push({'data':data}, targets=0)
|
|
|
d = pull('data', targets=0)
|
|
|
self.assertEquals(d, data)
|
|
|
push({'data':data})
|
|
|
d = pull('data')
|
|
|
self.assertEquals(d, nengines*[data])
|
|
|
ar = push({'data':data}, block=False)
|
|
|
self.assertTrue(isinstance(ar, AsyncResult))
|
|
|
r = ar.get()
|
|
|
ar = pull('data', block=False)
|
|
|
self.assertTrue(isinstance(ar, AsyncResult))
|
|
|
r = ar.get()
|
|
|
self.assertEquals(r, nengines*[data])
|
|
|
push(dict(a=10,b=20))
|
|
|
r = pull(('a','b'))
|
|
|
self.assertEquals(r, nengines*[[10,20]])
|
|
|
|
|
|
def test_push_pull_function(self):
|
|
|
"test pushing and pulling functions"
|
|
|
def testf(x):
|
|
|
return 2.0*x
|
|
|
|
|
|
self.add_engines(4)
|
|
|
self.client.block=True
|
|
|
push = self.client.push
|
|
|
pull = self.client.pull
|
|
|
execute = self.client.execute
|
|
|
push({'testf':testf}, targets=0)
|
|
|
r = pull('testf', targets=0)
|
|
|
self.assertEqual(r(1.0), testf(1.0))
|
|
|
execute('r = testf(10)', targets=0)
|
|
|
r = pull('r', targets=0)
|
|
|
self.assertEquals(r, testf(10))
|
|
|
ar = push({'testf':testf}, block=False)
|
|
|
ar.get()
|
|
|
ar = pull('testf', block=False)
|
|
|
rlist = ar.get()
|
|
|
for r in rlist:
|
|
|
self.assertEqual(r(1.0), testf(1.0))
|
|
|
execute("def g(x): return x*x", targets=0)
|
|
|
r = pull(('testf','g'),targets=0)
|
|
|
self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
|
|
|
|
|
|
def test_push_function_globals(self):
|
|
|
"""test that pushed functions have access to globals"""
|
|
|
def geta():
|
|
|
return a
|
|
|
self.add_engines(1)
|
|
|
v = self.client[-1]
|
|
|
v.block=True
|
|
|
v['f'] = geta
|
|
|
self.assertRaisesRemote(NameError, v.execute, 'b=f()')
|
|
|
v.execute('a=5')
|
|
|
v.execute('b=f()')
|
|
|
self.assertEquals(v['b'], 5)
|
|
|
|
|
|
def test_get_result(self):
|
|
|
"""test getting results from the Hub."""
|
|
|
c = clientmod.Client(profile='iptest')
|
|
|
t = self.client.ids[-1]
|
|
|
ar = c.apply(wait, (1,), block=False, targets=t)
|
|
|
time.sleep(.25)
|
|
|
ahr = self.client.get_result(ar.msg_ids)
|
|
|
self.assertTrue(isinstance(ahr, AsyncHubResult))
|
|
|
self.assertEquals(ahr.get(), ar.get())
|
|
|
ar2 = self.client.get_result(ar.msg_ids)
|
|
|
self.assertFalse(isinstance(ar2, AsyncHubResult))
|
|
|
|