##// END OF EJS Templates
Remove commented line
Remove commented line

File last commit:

r15372:5ce8451d
r15957:da0cb932
Show More
test_client.py
546 lines | 19.9 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """Tests for parallel client.py
Authors:
* Min RK
"""
MinRK
update API after sagedays29...
r3664
#-------------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
MinRK
update parallel code for py3k...
r4155 from __future__ import division
MinRK
added preliminary tests for zmq.parallel
r3595 import time
MinRK
add db_query and hub_history tests to test_client
r3782 from datetime import datetime
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
add message tracking to client, add/improve tests
r3654 import zmq
MinRK
some initial tests for newparallel
r3637
MinRK
update parallel magics...
r7476 from IPython import parallel
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.client import client as clientmod
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel import error
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel import AsyncResult, AsyncHubResult
from IPython.parallel import LoadBalancedView, DirectView
MinRK
added preliminary tests for zmq.parallel
r3595
Thomas Kluyver
Use explicit relative imports...
r13347 from .clienttest import ClusterTestCase, segfault, wait, add_engines
MinRK
update API after sagedays29...
r3664
def setup():
MinRK
expedite IPython.parallel tests...
r6162 add_engines(4, total=True)
MinRK
added preliminary tests for zmq.parallel
r3595
class TestClient(ClusterTestCase):
def test_ids(self):
MinRK
testing fixes
r3641 n = len(self.client.ids)
MinRK
expedite IPython.parallel tests...
r6162 self.add_engines(2)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(len(self.client.ids), n+2)
MinRK
added preliminary tests for zmq.parallel
r3595
def test_view_indexing(self):
MinRK
testing fixes
r3641 """test index access for views"""
MinRK
expedite IPython.parallel tests...
r6162 self.minimum_engines(4)
MinRK
added preliminary tests for zmq.parallel
r3595 targets = self.client._build_targets('all')[-1]
v = self.client[:]
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets)
MinRK
some initial tests for newparallel
r3637 t = self.client.ids[2]
v = self.client[t]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, t)
MinRK
some initial tests for newparallel
r3637 t = self.client.ids[2:4]
v = self.client[t]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, t)
MinRK
some initial tests for newparallel
r3637 v = self.client[::2]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets[::2])
MinRK
some initial tests for newparallel
r3637 v = self.client[1::3]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets[1::3])
MinRK
some initial tests for newparallel
r3637 v = self.client[:-3]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets[:-3])
MinRK
fix/test pushed function globals
r3638 v = self.client[-1]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets[-1])
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 self.assertRaises(TypeError, lambda : self.client[None])
MinRK
some initial tests for newparallel
r3637
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 def test_lbview_targets(self):
"""test load_balanced_view targets"""
v = self.client.load_balanced_view()
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, None)
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 v = self.client.load_balanced_view(-1)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, [self.client.ids[-1]])
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 v = self.client.load_balanced_view('all')
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, None)
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492
def test_dview_targets(self):
MinRK
handle targets='all' in remotefunction...
r5290 """test direct_view targets"""
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 v = self.client.direct_view()
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, 'all')
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 v = self.client.direct_view('all')
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, 'all')
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 v = self.client.direct_view(-1)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, self.client.ids[-1])
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666
MinRK
handle targets='all' in remotefunction...
r5290 def test_lazy_all_targets(self):
"""test lazy evaluation of rc.direct_view('all')"""
v = self.client.direct_view()
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, 'all')
MinRK
handle targets='all' in remotefunction...
r5290
def double(x):
return x*2
Thomas Kluyver
Fix parallel test suite
r13383 seq = list(range(100))
MinRK
handle targets='all' in remotefunction...
r5290 ref = [ double(x) for x in seq ]
# add some engines, which should be used
MinRK
expedite IPython.parallel tests...
r6162 self.add_engines(1)
MinRK
handle targets='all' in remotefunction...
r5290 n1 = len(self.client.ids)
# simple apply
r = v.apply_sync(lambda : 1)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, [1] * n1)
MinRK
handle targets='all' in remotefunction...
r5290
# map goes through remotefunction
r = v.map_sync(double, seq)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, ref)
MinRK
handle targets='all' in remotefunction...
r5290
# add a couple more engines, and try again
self.add_engines(2)
n2 = len(self.client.ids)
Bradley M. Froehle
s/assertNotEquals/assertNotEqual/
r7878 self.assertNotEqual(n2, n1)
MinRK
handle targets='all' in remotefunction...
r5290
# apply
r = v.apply_sync(lambda : 1)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, [1] * n2)
MinRK
handle targets='all' in remotefunction...
r5290
# map
r = v.map_sync(double, seq)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, ref)
MinRK
handle targets='all' in remotefunction...
r5290
MinRK
some initial tests for newparallel
r3637 def test_targets(self):
"""test various valid targets arguments"""
MinRK
testing fixes
r3641 build = self.client._build_targets
ids = self.client.ids
idents,targets = build(None)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ids, targets)
MinRK
some initial tests for newparallel
r3637
def test_clear(self):
"""test clear behavior"""
MinRK
expedite IPython.parallel tests...
r6162 self.minimum_engines(2)
MinRK
update API after sagedays29...
r3664 v = self.client[:]
v.block=True
v.push(dict(a=5))
v.pull('a')
MinRK
testing fixes
r3641 id0 = self.client.ids[-1]
MinRK
Start each test with clear engine namespaces...
r3775 self.client.clear(targets=id0, block=True)
a = self.client[:-1].get('a')
MinRK
update API after sagedays29...
r3664 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
self.client.clear(block=True)
MinRK
testing fixes
r3641 for i in self.client.ids:
MinRK
update API after sagedays29...
r3664 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
MinRK
some initial tests for newparallel
r3637
MinRK
testing fixes
r3641 def test_get_result(self):
"""test getting results from the Hub."""
c = clientmod.Client(profile='iptest')
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 t = c.ids[-1]
MinRK
update API after sagedays29...
r3664 ar = c[t].apply_async(wait, 1)
MinRK
update connections and diagrams for reduced sockets
r3658 # give the monitor time to notice the message
MinRK
testing fixes
r3641 time.sleep(.25)
MinRK
update client.get_result to match AsyncResult behavior
r11416 ahr = self.client.get_result(ar.msg_ids[0])
MinRK
testing fixes
r3641 self.assertTrue(isinstance(ahr, AsyncHubResult))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ahr.get(), ar.get())
MinRK
update client.get_result to match AsyncResult behavior
r11416 ar2 = self.client.get_result(ar.msg_ids[0])
MinRK
testing fixes
r3641 self.assertFalse(isinstance(ar2, AsyncHubResult))
MinRK
update API after sagedays29...
r3664 c.close()
MinRK
fix small client bugs + tests
r3650
MinRK
fix & test HubResults from execute requests
r7508 def test_get_execute_result(self):
"""test getting execute results from the Hub."""
c = clientmod.Client(profile='iptest')
t = c.ids[-1]
cell = '\n'.join([
'import time',
'time.sleep(0.25)',
'5'
])
ar = c[t].execute("import time; time.sleep(1)", silent=False)
# give the monitor time to notice the message
time.sleep(.25)
MinRK
update client.get_result to match AsyncResult behavior
r11416 ahr = self.client.get_result(ar.msg_ids[0])
MinRK
fix & test HubResults from execute requests
r7508 self.assertTrue(isinstance(ahr, AsyncHubResult))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ahr.get().pyout, ar.get().pyout)
MinRK
update client.get_result to match AsyncResult behavior
r11416 ar2 = self.client.get_result(ar.msg_ids[0])
MinRK
fix & test HubResults from execute requests
r7508 self.assertFalse(isinstance(ar2, AsyncHubResult))
c.close()
MinRK
fix small client bugs + tests
r3650 def test_ids_list(self):
"""test client.ids"""
ids = self.client.ids
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ids, self.client._ids)
MinRK
fix small client bugs + tests
r3650 self.assertFalse(ids is self.client._ids)
ids.remove(ids[-1])
Bradley M. Froehle
s/assertNotEquals/assertNotEqual/
r7878 self.assertNotEqual(ids, self.client._ids)
MinRK
fix small client bugs + tests
r3650
MinRK
update API after sagedays29...
r3664 def test_queue_status(self):
ids = self.client.ids
id0 = ids[0]
qs = self.client.queue_status(targets=id0)
self.assertTrue(isinstance(qs, dict))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
MinRK
update API after sagedays29...
r3664 allqs = self.client.queue_status()
self.assertTrue(isinstance(allqs, dict))
MinRK
update parallel code for py3k...
r4155 intkeys = list(allqs.keys())
intkeys.remove('unassigned')
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(sorted(intkeys), sorted(self.client.ids))
MinRK
fix out of sync parallel tests...
r3701 unassigned = allqs.pop('unassigned')
MinRK
update API after sagedays29...
r3664 for eid,qs in allqs.items():
self.assertTrue(isinstance(qs, dict))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
MinRK
fix small client bugs + tests
r3650
MinRK
update API after sagedays29...
r3664 def test_shutdown(self):
ids = self.client.ids
id0 = ids[0]
self.client.shutdown(id0, block=True)
while id0 in self.client.ids:
time.sleep(0.1)
self.client.spin()
MinRK
add message tracking to client, add/improve tests
r3654
MinRK
update API after sagedays29...
r3664 self.assertRaises(IndexError, lambda : self.client[id0])
MinRK
add message tracking to client, add/improve tests
r3654
MinRK
update API after sagedays29...
r3664 def test_result_status(self):
pass
# to be written
MinRK
add db_query and hub_history tests to test_client
r3782
def test_db_query_dt(self):
"""test db query by date"""
hist = self.client.hub_history()
MinRK
update parallel code for py3k...
r4155 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
MinRK
add db_query and hub_history tests to test_client
r3782 tic = middle['submitted']
before = self.client.db_query({'submitted' : {'$lt' : tic}})
after = self.client.db_query({'submitted' : {'$gte' : tic}})
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(len(before)+len(after),len(hist))
MinRK
add db_query and hub_history tests to test_client
r3782 for b in before:
self.assertTrue(b['submitted'] < tic)
for a in after:
self.assertTrue(a['submitted'] >= tic)
same = self.client.db_query({'submitted' : tic})
for s in same:
self.assertTrue(s['submitted'] == tic)
def test_db_query_keys(self):
"""test extracting subset of record keys"""
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
for rec in found:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
MinRK
add db_query and hub_history tests to test_client
r3782
MinRK
exclude buffers from default db_query...
r6094 def test_db_query_default_keys(self):
"""default db_query excludes buffers"""
found = self.client.db_query({'msg_id': {'$ne' : ''}})
for rec in found:
keys = set(rec.keys())
self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
MinRK
add db_query and hub_history tests to test_client
r3782 def test_db_query_msg_id(self):
"""ensure msg_id is always in db queries"""
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
MinRK
dictdb queries should [shallow] copy records...
r6323 def test_db_query_get_result(self):
"""pop in db_query shouldn't pop from result itself"""
self.client[:].apply_sync(lambda : 1)
found = self.client.db_query({'msg_id': {'$ne' : ''}})
rc2 = clientmod.Client(profile='iptest')
# If this bug is not fixed, this call will hang:
ar = rc2.get_result(self.client.history[-1])
ar.wait(2)
self.assertTrue(ar.ready())
ar.get()
rc2.close()
MinRK
add db_query and hub_history tests to test_client
r3782 def test_db_query_in(self):
"""test db query with '$in','$nin' operators"""
hist = self.client.hub_history()
even = hist[::2]
odd = hist[1::2]
recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
found = [ r['msg_id'] for r in recs ]
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(set(even), set(found))
MinRK
add db_query and hub_history tests to test_client
r3782 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
found = [ r['msg_id'] for r in recs ]
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(set(odd), set(found))
MinRK
add db_query and hub_history tests to test_client
r3782
def test_hub_history(self):
hist = self.client.hub_history()
recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
recdict = {}
for rec in recs:
recdict[rec['msg_id']] = rec
latest = datetime(1984,1,1)
for msg_id in hist:
rec = recdict[msg_id]
newt = rec['submitted']
self.assertTrue(newt >= latest)
latest = newt
ar = self.client[-1].apply_async(lambda : 1)
ar.get()
time.sleep(0.25)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
MinRK
add db_query and hub_history tests to test_client
r3782
MinRK
test resubmit of aborted tasks (#1647)
r6808 def _wait_for_idle(self):
MinRK
wait for empty queues as well as tasks...
r13530 """wait for the cluster to become idle, according to the everyone."""
MinRK
test resubmit of aborted tasks (#1647)
r6808 rc = self.client
MinRK
wait for empty queues as well as tasks...
r13530 # step 0. wait for local results
# this should be sufficient 99% of the time.
rc.wait(timeout=5)
MinRK
improve patience for slow Hub in client tests...
r9164 # step 1. wait for all requests to be noticed
# timeout 5s, polling every 100ms
msg_ids = set(rc.history)
hub_hist = rc.hub_history()
for i in range(50):
if msg_ids.difference(hub_hist):
time.sleep(0.1)
hub_hist = rc.hub_history()
else:
break
self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
# step 2. wait for all requests to be done
MinRK
minor tweak to wait_for_idle in tests
r7500 # timeout 5s, polling every 100ms
qs = rc.queue_status()
for i in range(50):
MinRK
don't use lazily-evaluated rc.ids in wait_for_idle...
r13673 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
MinRK
test resubmit of aborted tasks (#1647)
r6808 time.sleep(0.1)
MinRK
minor tweak to wait_for_idle in tests
r7500 qs = rc.queue_status()
MinRK
test resubmit of aborted tasks (#1647)
r6808 else:
break
# ensure Hub up to date:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(qs['unassigned'], 0)
MinRK
fix typo in PR #4744
r13974 for eid in [ eid for eid in qs if eid != 'unassigned' ]:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(qs[eid]['tasks'], 0)
MinRK
wait for empty queues as well as tasks...
r13530 self.assertEqual(qs[eid]['queue'], 0)
MinRK
test resubmit of aborted tasks (#1647)
r6808
MinRK
add Client.resubmit for re-running tasks...
r3874 def test_resubmit(self):
def f():
import random
return random.random()
v = self.client.load_balanced_view()
ar = v.apply_async(f)
r1 = ar.get(1)
MinRK
improve reliability of occasionally failing parallel tests
r4469 # give the Hub a chance to notice:
MinRK
test resubmit of aborted tasks (#1647)
r6808 self._wait_for_idle()
MinRK
add Client.resubmit for re-running tasks...
r3874 ahr = self.client.resubmit(ar.msg_ids)
r2 = ahr.get(1)
self.assertFalse(r1 == r2)
MinRK
test chained resubmissions
r7295 def test_resubmit_chain(self):
"""resubmit resubmitted tasks"""
v = self.client.load_balanced_view()
ar = v.apply_async(lambda x: x, 'x'*1024)
ar.get()
self._wait_for_idle()
ars = [ar]
for i in range(10):
ar = ars[-1]
ar2 = self.client.resubmit(ar.msg_ids)
[ ar.get() for ar in ars ]
MinRK
preserve header for resubmitted tasks...
r7262 def test_resubmit_header(self):
"""resubmit shouldn't clobber the whole header"""
def f():
import random
return random.random()
v = self.client.load_balanced_view()
v.retries = 1
ar = v.apply_async(f)
r1 = ar.get(1)
# give the Hub a chance to notice:
self._wait_for_idle()
ahr = self.client.resubmit(ar.msg_ids)
ahr.get(1)
time.sleep(0.5)
records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
h1,h2 = [ r['header'] for r in records ]
for key in set(h1.keys()).union(set(h2.keys())):
if key in ('msg_id', 'date'):
Bradley M. Froehle
s/assertNotEquals/assertNotEqual/
r7878 self.assertNotEqual(h1[key], h2[key])
MinRK
preserve header for resubmitted tasks...
r7262 else:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(h1[key], h2[key])
MinRK
preserve header for resubmitted tasks...
r7262
MinRK
test resubmit of aborted tasks (#1647)
r6808 def test_resubmit_aborted(self):
def f():
import random
return random.random()
v = self.client.load_balanced_view()
# restrict to one engine, so we can put a sleep
# ahead of the task, so it will get aborted
eid = self.client.ids[-1]
v.targets = [eid]
sleep = v.apply_async(time.sleep, 0.5)
ar = v.apply_async(f)
ar.abort()
self.assertRaises(error.TaskAborted, ar.get)
# Give the Hub a chance to get up to date:
self._wait_for_idle()
ahr = self.client.resubmit(ar.msg_ids)
r2 = ahr.get(1)
MinRK
add Client.resubmit for re-running tasks...
r3874 def test_resubmit_inflight(self):
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 """resubmit of inflight task"""
MinRK
add Client.resubmit for re-running tasks...
r3874 v = self.client.load_balanced_view()
ar = v.apply_async(time.sleep,1)
# give the message a chance to arrive
time.sleep(0.2)
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 ahr = self.client.resubmit(ar.msg_ids)
MinRK
add Client.resubmit for re-running tasks...
r3874 ar.get(2)
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 ahr.get(2)
MinRK
add Client.resubmit for re-running tasks...
r3874
def test_resubmit_badkey(self):
"""ensure KeyError on resubmit of nonexistant task"""
self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
MinRK
various db backend fixes...
r3875
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 def test_purge_hub_results(self):
MinRK
fix purge_results for args other than specified msg_id...
r4146 # ensure there are some tasks
for i in range(5):
self.client[:].apply_sync(lambda : 1)
MinRK
prevent race condition in purge_results test...
r4173 # Wait for the Hub to realise the result is done:
# This prevents a race condition, where we
# might purge a result the Hub still thinks is pending.
MinRK
improve patience for slow Hub in client tests...
r9164 self._wait_for_idle()
MinRK
prevent race condition in purge_results test...
r4173 rc2 = clientmod.Client(profile='iptest')
MinRK
various db backend fixes...
r3875 hist = self.client.hub_history()
MinRK
prevent race condition in purge_results test...
r4173 ahr = rc2.get_result([hist[-1]])
ahr.wait(10)
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.client.purge_hub_results(hist[-1])
MinRK
various db backend fixes...
r3875 newhist = self.client.hub_history()
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(len(newhist)+1,len(hist))
MinRK
make sure extra client in purge_results test is closed...
r4176 rc2.spin()
rc2.close()
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410
def test_purge_local_results(self):
# ensure there are some tasks
res = []
for i in range(5):
res.append(self.client[:].apply_async(lambda : 1))
MinRK
improve patience for slow Hub in client tests...
r9164 self._wait_for_idle()
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.client.wait(10) # wait for the results to come back
before = len(self.client.results)
self.assertEqual(len(self.client.metadata),before)
self.client.purge_local_results(res[-1])
self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
MinRK
test purging local results while some are outstanding
r13918
def test_purge_local_results_outstanding(self):
v = self.client[-1]
ar = v.apply_async(lambda : 1)
msg_id = ar.msg_ids[0]
ar.get()
self._wait_for_idle()
ar2 = v.apply_async(time.sleep, 1)
self.assertIn(msg_id, self.client.results)
self.assertIn(msg_id, self.client.metadata)
self.client.purge_local_results(ar)
self.assertNotIn(msg_id, self.client.results)
self.assertNotIn(msg_id, self.client.metadata)
with self.assertRaises(RuntimeError):
self.client.purge_local_results(ar2)
ar2.get()
self.client.purge_local_results(ar2)
def test_purge_all_local_results_outstanding(self):
v = self.client[-1]
ar = v.apply_async(time.sleep, 1)
with self.assertRaises(RuntimeError):
self.client.purge_local_results('all')
ar.get()
self.client.purge_local_results('all')
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 def test_purge_all_hub_results(self):
self.client.purge_hub_results('all')
hist = self.client.hub_history()
self.assertEqual(len(hist), 0)
def test_purge_all_local_results(self):
self.client.purge_local_results('all')
self.assertEqual(len(self.client.results), 0, msg="Results not empty")
self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
MinRK
fix purge_results for args other than specified msg_id...
r4146 def test_purge_all_results(self):
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 # ensure there are some tasks
for i in range(5):
self.client[:].apply_sync(lambda : 1)
self.client.wait(10)
MinRK
improve patience for slow Hub in client tests...
r9164 self._wait_for_idle()
MinRK
fix purge_results for args other than specified msg_id...
r4146 self.client.purge_results('all')
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
MinRK
improve patience for slow Hub in client tests...
r9164 hist = self.client.hub_history()
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.assertEqual(len(hist), 0, msg="hub history not empty")
def test_purge_everything(self):
# ensure there are some tasks
for i in range(5):
self.client[:].apply_sync(lambda : 1)
self.client.wait(10)
MinRK
improve patience for slow Hub in client tests...
r9164 self._wait_for_idle()
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.client.purge_everything()
# The client results
self.assertEqual(len(self.client.results), 0, msg="Results not empty")
self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
# The client "bookkeeping"
self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
self.assertEqual(len(self.client.history), 0, msg="client history not empty")
MinRK
improve patience for slow Hub in client tests...
r9164 # the hub results
hist = self.client.hub_history()
self.assertEqual(len(hist), 0, msg="hub history not empty")
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410
MinRK
add Client.spin_thread()...
r6484
def test_spin_thread(self):
self.client.spin_thread(0.01)
ar = self.client[-1].apply_async(lambda : 1)
time.sleep(0.1)
self.assertTrue(ar.wall_time < 0.1,
"spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
)
def test_stop_spin_thread(self):
self.client.spin_thread(0.01)
self.client.stop_spin_thread()
ar = self.client[-1].apply_async(lambda : 1)
time.sleep(0.15)
self.assertTrue(ar.wall_time > 0.1,
"Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
)
MinRK
update parallel magics...
r7476 def test_activate(self):
ip = get_ipython()
magics = ip.magics_manager.magics
self.assertTrue('px' in magics['line'])
self.assertTrue('px' in magics['cell'])
v0 = self.client.activate(-1, '0')
self.assertTrue('px0' in magics['line'])
self.assertTrue('px0' in magics['cell'])
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v0.targets, self.client.ids[-1])
MinRK
update parallel magics...
r7476 v0 = self.client.activate('all', 'all')
self.assertTrue('pxall' in magics['line'])
self.assertTrue('pxall' in magics['cell'])
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v0.targets, 'all')