##// END OF EJS Templates
Reconnect when the websocket connection closes unexpectedly
Reconnect when the websocket connection closes unexpectedly

File last commit:

r7295:3f19ba07
r7360:4be00dd0
Show More
test_client.py
422 lines | 14.7 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """Tests for parallel client.py
Authors:
* Min RK
"""
MinRK
update API after sagedays29...
r3664
#-------------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
MinRK
update parallel code for py3k...
r4155 from __future__ import division
MinRK
added preliminary tests for zmq.parallel
r3595 import time
MinRK
add db_query and hub_history tests to test_client
r3782 from datetime import datetime
MinRK
fix small client bugs + tests
r3650 from tempfile import mktemp
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
add message tracking to client, add/improve tests
r3654 import zmq
MinRK
some initial tests for newparallel
r3637
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.client import client as clientmod
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel import error
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel import AsyncResult, AsyncHubResult
from IPython.parallel import LoadBalancedView, DirectView
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
update API after sagedays29...
r3664 from clienttest import ClusterTestCase, segfault, wait, add_engines
def setup():
MinRK
expedite IPython.parallel tests...
r6162 add_engines(4, total=True)
MinRK
added preliminary tests for zmq.parallel
r3595
class TestClient(ClusterTestCase):
def test_ids(self):
MinRK
testing fixes
r3641 n = len(self.client.ids)
MinRK
expedite IPython.parallel tests...
r6162 self.add_engines(2)
self.assertEquals(len(self.client.ids), n+2)
MinRK
added preliminary tests for zmq.parallel
r3595
def test_view_indexing(self):
MinRK
testing fixes
r3641 """test index access for views"""
MinRK
expedite IPython.parallel tests...
r6162 self.minimum_engines(4)
MinRK
added preliminary tests for zmq.parallel
r3595 targets = self.client._build_targets('all')[-1]
v = self.client[:]
self.assertEquals(v.targets, targets)
MinRK
some initial tests for newparallel
r3637 t = self.client.ids[2]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
t = self.client.ids[2:4]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
v = self.client[::2]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[::2])
MinRK
some initial tests for newparallel
r3637 v = self.client[1::3]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[1::3])
MinRK
some initial tests for newparallel
r3637 v = self.client[:-3]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[:-3])
MinRK
fix/test pushed function globals
r3638 v = self.client[-1]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[-1])
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 self.assertRaises(TypeError, lambda : self.client[None])
MinRK
some initial tests for newparallel
r3637
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 def test_lbview_targets(self):
"""test load_balanced_view targets"""
v = self.client.load_balanced_view()
self.assertEquals(v.targets, None)
v = self.client.load_balanced_view(-1)
self.assertEquals(v.targets, [self.client.ids[-1]])
v = self.client.load_balanced_view('all')
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 self.assertEquals(v.targets, None)
def test_dview_targets(self):
MinRK
handle targets='all' in remotefunction...
r5290 """test direct_view targets"""
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 v = self.client.direct_view()
self.assertEquals(v.targets, 'all')
v = self.client.direct_view('all')
self.assertEquals(v.targets, 'all')
v = self.client.direct_view(-1)
self.assertEquals(v.targets, self.client.ids[-1])
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666
MinRK
handle targets='all' in remotefunction...
r5290 def test_lazy_all_targets(self):
"""test lazy evaluation of rc.direct_view('all')"""
v = self.client.direct_view()
self.assertEquals(v.targets, 'all')
def double(x):
return x*2
seq = range(100)
ref = [ double(x) for x in seq ]
# add some engines, which should be used
MinRK
expedite IPython.parallel tests...
r6162 self.add_engines(1)
MinRK
handle targets='all' in remotefunction...
r5290 n1 = len(self.client.ids)
# simple apply
r = v.apply_sync(lambda : 1)
self.assertEquals(r, [1] * n1)
# map goes through remotefunction
r = v.map_sync(double, seq)
self.assertEquals(r, ref)
# add a couple more engines, and try again
self.add_engines(2)
n2 = len(self.client.ids)
self.assertNotEquals(n2, n1)
# apply
r = v.apply_sync(lambda : 1)
self.assertEquals(r, [1] * n2)
# map
r = v.map_sync(double, seq)
self.assertEquals(r, ref)
MinRK
some initial tests for newparallel
r3637 def test_targets(self):
"""test various valid targets arguments"""
MinRK
testing fixes
r3641 build = self.client._build_targets
ids = self.client.ids
idents,targets = build(None)
self.assertEquals(ids, targets)
MinRK
some initial tests for newparallel
r3637
def test_clear(self):
"""test clear behavior"""
MinRK
expedite IPython.parallel tests...
r6162 self.minimum_engines(2)
MinRK
update API after sagedays29...
r3664 v = self.client[:]
v.block=True
v.push(dict(a=5))
v.pull('a')
MinRK
testing fixes
r3641 id0 = self.client.ids[-1]
MinRK
Start each test with clear engine namespaces...
r3775 self.client.clear(targets=id0, block=True)
a = self.client[:-1].get('a')
MinRK
update API after sagedays29...
r3664 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
self.client.clear(block=True)
MinRK
testing fixes
r3641 for i in self.client.ids:
MinRK
update API after sagedays29...
r3664 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
MinRK
some initial tests for newparallel
r3637
MinRK
testing fixes
r3641 def test_get_result(self):
"""test getting results from the Hub."""
c = clientmod.Client(profile='iptest')
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 t = c.ids[-1]
MinRK
update API after sagedays29...
r3664 ar = c[t].apply_async(wait, 1)
MinRK
update connections and diagrams for reduced sockets
r3658 # give the monitor time to notice the message
MinRK
testing fixes
r3641 time.sleep(.25)
ahr = self.client.get_result(ar.msg_ids)
self.assertTrue(isinstance(ahr, AsyncHubResult))
self.assertEquals(ahr.get(), ar.get())
ar2 = self.client.get_result(ar.msg_ids)
self.assertFalse(isinstance(ar2, AsyncHubResult))
MinRK
update API after sagedays29...
r3664 c.close()
MinRK
fix small client bugs + tests
r3650
def test_ids_list(self):
"""test client.ids"""
ids = self.client.ids
self.assertEquals(ids, self.client._ids)
self.assertFalse(ids is self.client._ids)
ids.remove(ids[-1])
self.assertNotEquals(ids, self.client._ids)
MinRK
update API after sagedays29...
r3664 def test_queue_status(self):
ids = self.client.ids
id0 = ids[0]
qs = self.client.queue_status(targets=id0)
self.assertTrue(isinstance(qs, dict))
self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
allqs = self.client.queue_status()
self.assertTrue(isinstance(allqs, dict))
MinRK
update parallel code for py3k...
r4155 intkeys = list(allqs.keys())
intkeys.remove('unassigned')
self.assertEquals(sorted(intkeys), sorted(self.client.ids))
MinRK
fix out of sync parallel tests...
r3701 unassigned = allqs.pop('unassigned')
MinRK
update API after sagedays29...
r3664 for eid,qs in allqs.items():
self.assertTrue(isinstance(qs, dict))
self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
MinRK
fix small client bugs + tests
r3650
MinRK
update API after sagedays29...
r3664 def test_shutdown(self):
ids = self.client.ids
id0 = ids[0]
self.client.shutdown(id0, block=True)
while id0 in self.client.ids:
time.sleep(0.1)
self.client.spin()
MinRK
add message tracking to client, add/improve tests
r3654
MinRK
update API after sagedays29...
r3664 self.assertRaises(IndexError, lambda : self.client[id0])
MinRK
add message tracking to client, add/improve tests
r3654
MinRK
update API after sagedays29...
r3664 def test_result_status(self):
pass
# to be written
MinRK
add db_query and hub_history tests to test_client
r3782
def test_db_query_dt(self):
"""test db query by date"""
hist = self.client.hub_history()
MinRK
update parallel code for py3k...
r4155 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
MinRK
add db_query and hub_history tests to test_client
r3782 tic = middle['submitted']
before = self.client.db_query({'submitted' : {'$lt' : tic}})
after = self.client.db_query({'submitted' : {'$gte' : tic}})
self.assertEquals(len(before)+len(after),len(hist))
for b in before:
self.assertTrue(b['submitted'] < tic)
for a in after:
self.assertTrue(a['submitted'] >= tic)
same = self.client.db_query({'submitted' : tic})
for s in same:
self.assertTrue(s['submitted'] == tic)
def test_db_query_keys(self):
"""test extracting subset of record keys"""
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
for rec in found:
self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
MinRK
exclude buffers from default db_query...
r6094 def test_db_query_default_keys(self):
"""default db_query excludes buffers"""
found = self.client.db_query({'msg_id': {'$ne' : ''}})
for rec in found:
keys = set(rec.keys())
self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
MinRK
add db_query and hub_history tests to test_client
r3782 def test_db_query_msg_id(self):
"""ensure msg_id is always in db queries"""
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
MinRK
dictdb queries should [shallow] copy records...
r6323 def test_db_query_get_result(self):
"""pop in db_query shouldn't pop from result itself"""
self.client[:].apply_sync(lambda : 1)
found = self.client.db_query({'msg_id': {'$ne' : ''}})
rc2 = clientmod.Client(profile='iptest')
# If this bug is not fixed, this call will hang:
ar = rc2.get_result(self.client.history[-1])
ar.wait(2)
self.assertTrue(ar.ready())
ar.get()
rc2.close()
MinRK
add db_query and hub_history tests to test_client
r3782 def test_db_query_in(self):
"""test db query with '$in','$nin' operators"""
hist = self.client.hub_history()
even = hist[::2]
odd = hist[1::2]
recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
found = [ r['msg_id'] for r in recs ]
self.assertEquals(set(even), set(found))
recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
found = [ r['msg_id'] for r in recs ]
self.assertEquals(set(odd), set(found))
def test_hub_history(self):
hist = self.client.hub_history()
recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
recdict = {}
for rec in recs:
recdict[rec['msg_id']] = rec
latest = datetime(1984,1,1)
for msg_id in hist:
rec = recdict[msg_id]
newt = rec['submitted']
self.assertTrue(newt >= latest)
latest = newt
ar = self.client[-1].apply_async(lambda : 1)
ar.get()
time.sleep(0.25)
self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
MinRK
test resubmit of aborted tasks (#1647)
r6808 def _wait_for_idle(self):
"""wait for an engine to become idle, according to the Hub"""
rc = self.client
# timeout 2s, polling every 100ms
for i in range(20):
qs = rc.queue_status()
if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
time.sleep(0.1)
else:
break
# ensure Hub up to date:
qs = rc.queue_status()
self.assertEquals(qs['unassigned'], 0)
for eid in rc.ids:
self.assertEquals(qs[eid]['tasks'], 0)
MinRK
add Client.resubmit for re-running tasks...
r3874 def test_resubmit(self):
def f():
import random
return random.random()
v = self.client.load_balanced_view()
ar = v.apply_async(f)
r1 = ar.get(1)
MinRK
improve reliability of occasionally failing parallel tests
r4469 # give the Hub a chance to notice:
MinRK
test resubmit of aborted tasks (#1647)
r6808 self._wait_for_idle()
MinRK
add Client.resubmit for re-running tasks...
r3874 ahr = self.client.resubmit(ar.msg_ids)
r2 = ahr.get(1)
self.assertFalse(r1 == r2)
MinRK
test chained resubmissions
r7295 def test_resubmit_chain(self):
"""resubmit resubmitted tasks"""
v = self.client.load_balanced_view()
ar = v.apply_async(lambda x: x, 'x'*1024)
ar.get()
self._wait_for_idle()
ars = [ar]
for i in range(10):
ar = ars[-1]
ar2 = self.client.resubmit(ar.msg_ids)
[ ar.get() for ar in ars ]
MinRK
preserve header for resubmitted tasks...
r7262 def test_resubmit_header(self):
"""resubmit shouldn't clobber the whole header"""
def f():
import random
return random.random()
v = self.client.load_balanced_view()
v.retries = 1
ar = v.apply_async(f)
r1 = ar.get(1)
# give the Hub a chance to notice:
self._wait_for_idle()
ahr = self.client.resubmit(ar.msg_ids)
ahr.get(1)
time.sleep(0.5)
records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
h1,h2 = [ r['header'] for r in records ]
for key in set(h1.keys()).union(set(h2.keys())):
if key in ('msg_id', 'date'):
self.assertNotEquals(h1[key], h2[key])
else:
self.assertEquals(h1[key], h2[key])
MinRK
test resubmit of aborted tasks (#1647)
r6808 def test_resubmit_aborted(self):
def f():
import random
return random.random()
v = self.client.load_balanced_view()
# restrict to one engine, so we can put a sleep
# ahead of the task, so it will get aborted
eid = self.client.ids[-1]
v.targets = [eid]
sleep = v.apply_async(time.sleep, 0.5)
ar = v.apply_async(f)
ar.abort()
self.assertRaises(error.TaskAborted, ar.get)
# Give the Hub a chance to get up to date:
self._wait_for_idle()
ahr = self.client.resubmit(ar.msg_ids)
r2 = ahr.get(1)
MinRK
add Client.resubmit for re-running tasks...
r3874 def test_resubmit_inflight(self):
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 """resubmit of inflight task"""
MinRK
add Client.resubmit for re-running tasks...
r3874 v = self.client.load_balanced_view()
ar = v.apply_async(time.sleep,1)
# give the message a chance to arrive
time.sleep(0.2)
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 ahr = self.client.resubmit(ar.msg_ids)
MinRK
add Client.resubmit for re-running tasks...
r3874 ar.get(2)
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 ahr.get(2)
MinRK
add Client.resubmit for re-running tasks...
r3874
def test_resubmit_badkey(self):
"""ensure KeyError on resubmit of nonexistant task"""
self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
MinRK
various db backend fixes...
r3875
def test_purge_results(self):
MinRK
fix purge_results for args other than specified msg_id...
r4146 # ensure there are some tasks
for i in range(5):
self.client[:].apply_sync(lambda : 1)
MinRK
prevent race condition in purge_results test...
r4173 # Wait for the Hub to realise the result is done:
# This prevents a race condition, where we
# might purge a result the Hub still thinks is pending.
time.sleep(0.1)
rc2 = clientmod.Client(profile='iptest')
MinRK
various db backend fixes...
r3875 hist = self.client.hub_history()
MinRK
prevent race condition in purge_results test...
r4173 ahr = rc2.get_result([hist[-1]])
ahr.wait(10)
MinRK
fix purge_results for args other than specified msg_id...
r4146 self.client.purge_results(hist[-1])
MinRK
various db backend fixes...
r3875 newhist = self.client.hub_history()
MinRK
fix purge_results for args other than specified msg_id...
r4146 self.assertEquals(len(newhist)+1,len(hist))
MinRK
make sure extra client in purge_results test is closed...
r4176 rc2.spin()
rc2.close()
MinRK
fix purge_results for args other than specified msg_id...
r4146
def test_purge_all_results(self):
self.client.purge_results('all')
hist = self.client.hub_history()
self.assertEquals(len(hist), 0)
MinRK
add Client.spin_thread()...
r6484
def test_spin_thread(self):
self.client.spin_thread(0.01)
ar = self.client[-1].apply_async(lambda : 1)
time.sleep(0.1)
self.assertTrue(ar.wall_time < 0.1,
"spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
)
def test_stop_spin_thread(self):
self.client.spin_thread(0.01)
self.client.stop_spin_thread()
ar = self.client[-1].apply_async(lambda : 1)
time.sleep(0.15)
self.assertTrue(ar.wall_time > 0.1,
"Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
)