##// END OF EJS Templates
Update PBS/SGE launchers with 0.10.1 options and defaults
Update PBS/SGE launchers with 0.10.1 options and defaults

File last commit:

r3658:8fb951e7
r3659:42860010
Show More
test_client.py
262 lines | 8.9 KiB | text/x-python | PythonLexer
MinRK
added preliminary tests for zmq.parallel
r3595 import time
MinRK
fix small client bugs + tests
r3650 from tempfile import mktemp
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
some initial tests for newparallel
r3637 import nose.tools as nt
MinRK
add message tracking to client, add/improve tests
r3654 import zmq
MinRK
some initial tests for newparallel
r3637
MinRK
testing fixes
r3641 from IPython.zmq.parallel import client as clientmod
from IPython.zmq.parallel import error
from IPython.zmq.parallel.asyncresult import AsyncResult, AsyncHubResult
MinRK
added preliminary tests for zmq.parallel
r3595 from IPython.zmq.parallel.view import LoadBalancedView, DirectView
MinRK
testing fixes
r3641 from clienttest import ClusterTestCase, segfault, wait
MinRK
added preliminary tests for zmq.parallel
r3595
class TestClient(ClusterTestCase):
def test_ids(self):
MinRK
testing fixes
r3641 n = len(self.client.ids)
MinRK
added preliminary tests for zmq.parallel
r3595 self.add_engines(3)
MinRK
testing fixes
r3641 self.assertEquals(len(self.client.ids), n+3)
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
add message tracking to client, add/improve tests
r3654 def test_segfault_task(self):
"""test graceful handling of engine death (balanced)"""
MinRK
some initial tests for newparallel
r3637 self.add_engines(1)
MinRK
testing fixes
r3641 ar = self.client.apply(segfault, block=False)
self.assertRaisesRemote(error.EngineError, ar.get)
eid = ar.engine_id
MinRK
some initial tests for newparallel
r3637 while eid in self.client.ids:
MinRK
added preliminary tests for zmq.parallel
r3595 time.sleep(.01)
self.client.spin()
MinRK
add message tracking to client, add/improve tests
r3654 def test_segfault_mux(self):
"""test graceful handling of engine death (direct)"""
self.add_engines(1)
eid = self.client.ids[-1]
ar = self.client[eid].apply_async(segfault)
self.assertRaisesRemote(error.EngineError, ar.get)
eid = ar.engine_id
while eid in self.client.ids:
time.sleep(.01)
self.client.spin()
MinRK
added preliminary tests for zmq.parallel
r3595 def test_view_indexing(self):
MinRK
testing fixes
r3641 """test index access for views"""
self.add_engines(2)
MinRK
added preliminary tests for zmq.parallel
r3595 targets = self.client._build_targets('all')[-1]
v = self.client[:]
self.assertEquals(v.targets, targets)
MinRK
some initial tests for newparallel
r3637 t = self.client.ids[2]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
t = self.client.ids[2:4]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
v = self.client[::2]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[::2])
MinRK
some initial tests for newparallel
r3637 v = self.client[1::3]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[1::3])
MinRK
some initial tests for newparallel
r3637 v = self.client[:-3]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[:-3])
MinRK
fix/test pushed function globals
r3638 v = self.client[-1]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[-1])
MinRK
some initial tests for newparallel
r3637 nt.assert_raises(TypeError, lambda : self.client[None])
def test_view_cache(self):
MinRK
fix/test pushed function globals
r3638 """test that multiple view requests return the same object"""
MinRK
some initial tests for newparallel
r3637 v = self.client[:2]
v2 =self.client[:2]
self.assertTrue(v is v2)
v = self.client.view()
v2 = self.client.view(balanced=True)
self.assertTrue(v is v2)
def test_targets(self):
"""test various valid targets arguments"""
MinRK
testing fixes
r3641 build = self.client._build_targets
ids = self.client.ids
idents,targets = build(None)
self.assertEquals(ids, targets)
MinRK
some initial tests for newparallel
r3637
def test_clear(self):
"""test clear behavior"""
MinRK
testing fixes
r3641 self.add_engines(2)
self.client.block=True
self.client.push(dict(a=5))
self.client.pull('a')
id0 = self.client.ids[-1]
self.client.clear(targets=id0)
self.client.pull('a', targets=self.client.ids[:-1])
self.assertRaisesRemote(NameError, self.client.pull, 'a')
self.client.clear()
for i in self.client.ids:
self.assertRaisesRemote(NameError, self.client.pull, 'a', targets=i)
MinRK
some initial tests for newparallel
r3637
def test_push_pull(self):
MinRK
fix/test pushed function globals
r3638 """test pushing and pulling"""
MinRK
some initial tests for newparallel
r3637 data = dict(a=10, b=1.05, c=range(10), d={'e':(1,2),'f':'hi'})
MinRK
add message tracking to client, add/improve tests
r3654 t = self.client.ids[-1]
MinRK
testing fixes
r3641 self.add_engines(2)
MinRK
some initial tests for newparallel
r3637 push = self.client.push
pull = self.client.pull
self.client.block=True
nengines = len(self.client)
MinRK
add message tracking to client, add/improve tests
r3654 push({'data':data}, targets=t)
d = pull('data', targets=t)
MinRK
some initial tests for newparallel
r3637 self.assertEquals(d, data)
push({'data':data})
d = pull('data')
self.assertEquals(d, nengines*[data])
ar = push({'data':data}, block=False)
self.assertTrue(isinstance(ar, AsyncResult))
r = ar.get()
ar = pull('data', block=False)
self.assertTrue(isinstance(ar, AsyncResult))
r = ar.get()
self.assertEquals(r, nengines*[data])
push(dict(a=10,b=20))
r = pull(('a','b'))
self.assertEquals(r, nengines*[[10,20]])
def test_push_pull_function(self):
MinRK
fix/test pushed function globals
r3638 "test pushing and pulling functions"
MinRK
some initial tests for newparallel
r3637 def testf(x):
return 2.0*x
self.add_engines(4)
MinRK
add message tracking to client, add/improve tests
r3654 t = self.client.ids[-1]
MinRK
some initial tests for newparallel
r3637 self.client.block=True
push = self.client.push
pull = self.client.pull
execute = self.client.execute
MinRK
add message tracking to client, add/improve tests
r3654 push({'testf':testf}, targets=t)
r = pull('testf', targets=t)
MinRK
some initial tests for newparallel
r3637 self.assertEqual(r(1.0), testf(1.0))
MinRK
add message tracking to client, add/improve tests
r3654 execute('r = testf(10)', targets=t)
r = pull('r', targets=t)
MinRK
some initial tests for newparallel
r3637 self.assertEquals(r, testf(10))
ar = push({'testf':testf}, block=False)
ar.get()
ar = pull('testf', block=False)
rlist = ar.get()
for r in rlist:
self.assertEqual(r(1.0), testf(1.0))
MinRK
add message tracking to client, add/improve tests
r3654 execute("def g(x): return x*x", targets=t)
r = pull(('testf','g'),targets=t)
MinRK
some initial tests for newparallel
r3637 self.assertEquals((r[0](10),r[1](10)), (testf(10), 100))
MinRK
fix/test pushed function globals
r3638
def test_push_function_globals(self):
"""test that pushed functions have access to globals"""
def geta():
return a
self.add_engines(1)
v = self.client[-1]
v.block=True
v['f'] = geta
self.assertRaisesRemote(NameError, v.execute, 'b=f()')
v.execute('a=5')
v.execute('b=f()')
self.assertEquals(v['b'], 5)
MinRK
some initial tests for newparallel
r3637
MinRK
reflect revised apply_bound pattern
r3655 def test_push_function_defaults(self):
"""test that pushed functions preserve default args"""
def echo(a=10):
return a
self.add_engines(1)
v = self.client[-1]
v.block=True
v['f'] = echo
v.execute('b=f()')
self.assertEquals(v['b'], 10)
MinRK
testing fixes
r3641 def test_get_result(self):
"""test getting results from the Hub."""
c = clientmod.Client(profile='iptest')
MinRK
update connections and diagrams for reduced sockets
r3658 self.add_engines(1)
MinRK
testing fixes
r3641 ar = c.apply(wait, (1,), block=False, targets=t)
MinRK
update connections and diagrams for reduced sockets
r3658 # give the monitor time to notice the message
MinRK
testing fixes
r3641 time.sleep(.25)
ahr = self.client.get_result(ar.msg_ids)
self.assertTrue(isinstance(ahr, AsyncHubResult))
self.assertEquals(ahr.get(), ar.get())
ar2 = self.client.get_result(ar.msg_ids)
self.assertFalse(isinstance(ar2, AsyncHubResult))
MinRK
fix small client bugs + tests
r3650
def test_ids_list(self):
"""test client.ids"""
self.add_engines(2)
ids = self.client.ids
self.assertEquals(ids, self.client._ids)
self.assertFalse(ids is self.client._ids)
ids.remove(ids[-1])
self.assertNotEquals(ids, self.client._ids)
MinRK
add message tracking to client, add/improve tests
r3654 def test_run_newline(self):
MinRK
fix small client bugs + tests
r3650 """test that run appends newline to files"""
tmpfile = mktemp()
with open(tmpfile, 'w') as f:
f.write("""def g():
return 5
""")
v = self.client[-1]
v.run(tmpfile, block=True)
MinRK
reflect revised apply_bound pattern
r3655 self.assertEquals(v.apply_sync(lambda : g()), 5)
MinRK
fix small client bugs + tests
r3650
MinRK
add message tracking to client, add/improve tests
r3654 def test_apply_tracked(self):
"""test tracking for apply"""
# self.add_engines(1)
t = self.client.ids[-1]
self.client.block=False
def echo(n=1024*1024, **kwargs):
return self.client.apply(lambda x: x, args=('x'*n,), targets=t, **kwargs)
ar = echo(1)
self.assertTrue(ar._tracker is None)
self.assertTrue(ar.sent)
ar = echo(track=True)
self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
self.assertEquals(ar.sent, ar._tracker.done)
ar._tracker.wait()
self.assertTrue(ar.sent)
def test_push_tracked(self):
t = self.client.ids[-1]
ns = dict(x='x'*1024*1024)
ar = self.client.push(ns, targets=t, block=False)
self.assertTrue(ar._tracker is None)
self.assertTrue(ar.sent)
ar = self.client.push(ns, targets=t, block=False, track=True)
self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
self.assertEquals(ar.sent, ar._tracker.done)
ar._tracker.wait()
self.assertTrue(ar.sent)
ar.get()
def test_scatter_tracked(self):
t = self.client.ids
x='x'*1024*1024
ar = self.client.scatter('x', x, targets=t, block=False)
self.assertTrue(ar._tracker is None)
self.assertTrue(ar.sent)
ar = self.client.scatter('x', x, targets=t, block=False, track=True)
self.assertTrue(isinstance(ar._tracker, zmq.MessageTracker))
self.assertEquals(ar.sent, ar._tracker.done)
ar._tracker.wait()
self.assertTrue(ar.sent)
ar.get()
def test_remote_reference(self):
v = self.client[-1]
v['a'] = 123
ra = clientmod.Reference('a')
MinRK
reflect revised apply_bound pattern
r3655 b = v.apply_sync(lambda x: x, ra)
MinRK
add message tracking to client, add/improve tests
r3654 self.assertEquals(b, 123)