##// END OF EJS Templates
Merge pull request #8296 from minrk/rm-kernel...
Merge pull request #8296 from minrk/rm-kernel remove ipython_kernel

File last commit:

r20860:c3b763db
r21224:f9254a22 merge
Show More
test_client.py
550 lines | 20.0 KiB | text/x-python | PythonLexer
MinRK
pyout -> execute_result...
r16568 """Tests for parallel client.py"""
MinRK
update recently changed modules with Authors in docstring
r4018
MinRK
pyout -> execute_result...
r16568 # Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
MinRK
update API after sagedays29...
r3664
MinRK
update parallel code for py3k...
r4155 from __future__ import division
MinRK
added preliminary tests for zmq.parallel
r3595 import time
MinRK
add db_query and hub_history tests to test_client
r3782 from datetime import datetime
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
add message tracking to client, add/improve tests
r3654 import zmq
MinRK
some initial tests for newparallel
r3637
MinRK
update parallel magics...
r7476 from IPython import parallel
Min RK
s/IPython.parallel/ipython_parallel/
r20860 from ipython_parallel.client import client as clientmod
from ipython_parallel import error
from ipython_parallel import AsyncResult, AsyncHubResult
from ipython_parallel import LoadBalancedView, DirectView
MinRK
added preliminary tests for zmq.parallel
r3595
Thomas Kluyver
Use explicit relative imports...
r13347 from .clienttest import ClusterTestCase, segfault, wait, add_engines
MinRK
update API after sagedays29...
r3664
def setup():
MinRK
expedite IPython.parallel tests...
r6162 add_engines(4, total=True)
MinRK
added preliminary tests for zmq.parallel
r3595
class TestClient(ClusterTestCase):
def test_ids(self):
MinRK
testing fixes
r3641 n = len(self.client.ids)
MinRK
expedite IPython.parallel tests...
r6162 self.add_engines(2)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(len(self.client.ids), n+2)
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
fix iteration over Client...
r16200 def test_iter(self):
self.minimum_engines(4)
engine_ids = [ view.targets for view in self.client ]
self.assertEqual(engine_ids, self.client.ids)
MinRK
added preliminary tests for zmq.parallel
r3595 def test_view_indexing(self):
MinRK
testing fixes
r3641 """test index access for views"""
MinRK
expedite IPython.parallel tests...
r6162 self.minimum_engines(4)
MinRK
added preliminary tests for zmq.parallel
r3595 targets = self.client._build_targets('all')[-1]
v = self.client[:]
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets)
MinRK
some initial tests for newparallel
r3637 t = self.client.ids[2]
v = self.client[t]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, t)
MinRK
some initial tests for newparallel
r3637 t = self.client.ids[2:4]
v = self.client[t]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, t)
MinRK
some initial tests for newparallel
r3637 v = self.client[::2]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets[::2])
MinRK
some initial tests for newparallel
r3637 v = self.client[1::3]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets[1::3])
MinRK
some initial tests for newparallel
r3637 v = self.client[:-3]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets[:-3])
MinRK
fix/test pushed function globals
r3638 v = self.client[-1]
Bradley M. Froehle
s/assert_/assertTrue/
r7876 self.assertTrue(isinstance(v, DirectView))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, targets[-1])
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 self.assertRaises(TypeError, lambda : self.client[None])
MinRK
some initial tests for newparallel
r3637
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 def test_lbview_targets(self):
"""test load_balanced_view targets"""
v = self.client.load_balanced_view()
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, None)
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 v = self.client.load_balanced_view(-1)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, [self.client.ids[-1]])
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 v = self.client.load_balanced_view('all')
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, None)
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492
def test_dview_targets(self):
MinRK
handle targets='all' in remotefunction...
r5290 """test direct_view targets"""
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 v = self.client.direct_view()
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, 'all')
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 v = self.client.direct_view('all')
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, 'all')
MinRK
allow rc.direct_view('all') to be lazily-evaluated...
r4492 v = self.client.direct_view(-1)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, self.client.ids[-1])
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666
MinRK
handle targets='all' in remotefunction...
r5290 def test_lazy_all_targets(self):
"""test lazy evaluation of rc.direct_view('all')"""
v = self.client.direct_view()
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v.targets, 'all')
MinRK
handle targets='all' in remotefunction...
r5290
def double(x):
return x*2
Thomas Kluyver
Fix parallel test suite
r13383 seq = list(range(100))
MinRK
handle targets='all' in remotefunction...
r5290 ref = [ double(x) for x in seq ]
# add some engines, which should be used
MinRK
expedite IPython.parallel tests...
r6162 self.add_engines(1)
MinRK
handle targets='all' in remotefunction...
r5290 n1 = len(self.client.ids)
# simple apply
r = v.apply_sync(lambda : 1)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, [1] * n1)
MinRK
handle targets='all' in remotefunction...
r5290
# map goes through remotefunction
r = v.map_sync(double, seq)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, ref)
MinRK
handle targets='all' in remotefunction...
r5290
# add a couple more engines, and try again
self.add_engines(2)
n2 = len(self.client.ids)
Bradley M. Froehle
s/assertNotEquals/assertNotEqual/
r7878 self.assertNotEqual(n2, n1)
MinRK
handle targets='all' in remotefunction...
r5290
# apply
r = v.apply_sync(lambda : 1)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, [1] * n2)
MinRK
handle targets='all' in remotefunction...
r5290
# map
r = v.map_sync(double, seq)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, ref)
MinRK
handle targets='all' in remotefunction...
r5290
MinRK
some initial tests for newparallel
r3637 def test_targets(self):
"""test various valid targets arguments"""
MinRK
testing fixes
r3641 build = self.client._build_targets
ids = self.client.ids
idents,targets = build(None)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ids, targets)
MinRK
some initial tests for newparallel
r3637
def test_clear(self):
"""test clear behavior"""
MinRK
expedite IPython.parallel tests...
r6162 self.minimum_engines(2)
MinRK
update API after sagedays29...
r3664 v = self.client[:]
v.block=True
v.push(dict(a=5))
v.pull('a')
MinRK
testing fixes
r3641 id0 = self.client.ids[-1]
MinRK
Start each test with clear engine namespaces...
r3775 self.client.clear(targets=id0, block=True)
a = self.client[:-1].get('a')
MinRK
update API after sagedays29...
r3664 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
self.client.clear(block=True)
MinRK
testing fixes
r3641 for i in self.client.ids:
MinRK
update API after sagedays29...
r3664 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
MinRK
some initial tests for newparallel
r3637
MinRK
testing fixes
r3641 def test_get_result(self):
"""test getting results from the Hub."""
c = clientmod.Client(profile='iptest')
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 t = c.ids[-1]
MinRK
update API after sagedays29...
r3664 ar = c[t].apply_async(wait, 1)
MinRK
update connections and diagrams for reduced sockets
r3658 # give the monitor time to notice the message
MinRK
testing fixes
r3641 time.sleep(.25)
MinRK
add ownership to AsyncResult objects...
r16739 ahr = self.client.get_result(ar.msg_ids[0], owner=False)
self.assertIsInstance(ahr, AsyncHubResult)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ahr.get(), ar.get())
MinRK
update client.get_result to match AsyncResult behavior
r11416 ar2 = self.client.get_result(ar.msg_ids[0])
MinRK
add ownership to AsyncResult objects...
r16739 self.assertNotIsInstance(ar2, AsyncHubResult)
self.assertEqual(ahr.get(), ar2.get())
MinRK
update API after sagedays29...
r3664 c.close()
MinRK
fix small client bugs + tests
r3650
MinRK
fix & test HubResults from execute requests
r7508 def test_get_execute_result(self):
"""test getting execute results from the Hub."""
c = clientmod.Client(profile='iptest')
t = c.ids[-1]
cell = '\n'.join([
'import time',
'time.sleep(0.25)',
'5'
])
ar = c[t].execute("import time; time.sleep(1)", silent=False)
# give the monitor time to notice the message
time.sleep(.25)
MinRK
add ownership to AsyncResult objects...
r16739 ahr = self.client.get_result(ar.msg_ids[0], owner=False)
self.assertIsInstance(ahr, AsyncHubResult)
MinRK
pyout -> execute_result...
r16568 self.assertEqual(ahr.get().execute_result, ar.get().execute_result)
MinRK
update client.get_result to match AsyncResult behavior
r11416 ar2 = self.client.get_result(ar.msg_ids[0])
MinRK
add ownership to AsyncResult objects...
r16739 self.assertNotIsInstance(ar2, AsyncHubResult)
self.assertEqual(ahr.get(), ar2.get())
MinRK
fix & test HubResults from execute requests
r7508 c.close()
MinRK
fix small client bugs + tests
r3650 def test_ids_list(self):
"""test client.ids"""
ids = self.client.ids
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ids, self.client._ids)
MinRK
fix small client bugs + tests
r3650 self.assertFalse(ids is self.client._ids)
ids.remove(ids[-1])
Bradley M. Froehle
s/assertNotEquals/assertNotEqual/
r7878 self.assertNotEqual(ids, self.client._ids)
MinRK
fix small client bugs + tests
r3650
MinRK
update API after sagedays29...
r3664 def test_queue_status(self):
ids = self.client.ids
id0 = ids[0]
qs = self.client.queue_status(targets=id0)
self.assertTrue(isinstance(qs, dict))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
MinRK
update API after sagedays29...
r3664 allqs = self.client.queue_status()
self.assertTrue(isinstance(allqs, dict))
MinRK
update parallel code for py3k...
r4155 intkeys = list(allqs.keys())
intkeys.remove('unassigned')
MinRK
debug occasional error in test_queue_status...
r15746 print("intkeys", intkeys)
intkeys = sorted(intkeys)
ids = self.client.ids
print("client.ids", ids)
ids = sorted(self.client.ids)
self.assertEqual(intkeys, ids)
MinRK
fix out of sync parallel tests...
r3701 unassigned = allqs.pop('unassigned')
MinRK
update API after sagedays29...
r3664 for eid,qs in allqs.items():
self.assertTrue(isinstance(qs, dict))
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
MinRK
fix small client bugs + tests
r3650
MinRK
update API after sagedays29...
r3664 def test_shutdown(self):
ids = self.client.ids
id0 = ids[0]
self.client.shutdown(id0, block=True)
while id0 in self.client.ids:
time.sleep(0.1)
self.client.spin()
MinRK
add message tracking to client, add/improve tests
r3654
MinRK
update API after sagedays29...
r3664 self.assertRaises(IndexError, lambda : self.client[id0])
MinRK
add message tracking to client, add/improve tests
r3654
MinRK
update API after sagedays29...
r3664 def test_result_status(self):
pass
# to be written
MinRK
add db_query and hub_history tests to test_client
r3782
def test_db_query_dt(self):
"""test db query by date"""
hist = self.client.hub_history()
MinRK
update parallel code for py3k...
r4155 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
MinRK
add db_query and hub_history tests to test_client
r3782 tic = middle['submitted']
before = self.client.db_query({'submitted' : {'$lt' : tic}})
after = self.client.db_query({'submitted' : {'$gte' : tic}})
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(len(before)+len(after),len(hist))
MinRK
add db_query and hub_history tests to test_client
r3782 for b in before:
self.assertTrue(b['submitted'] < tic)
for a in after:
self.assertTrue(a['submitted'] >= tic)
same = self.client.db_query({'submitted' : tic})
for s in same:
self.assertTrue(s['submitted'] == tic)
def test_db_query_keys(self):
"""test extracting subset of record keys"""
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
for rec in found:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
MinRK
add db_query and hub_history tests to test_client
r3782
MinRK
exclude buffers from default db_query...
r6094 def test_db_query_default_keys(self):
"""default db_query excludes buffers"""
found = self.client.db_query({'msg_id': {'$ne' : ''}})
for rec in found:
keys = set(rec.keys())
self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
MinRK
add db_query and hub_history tests to test_client
r3782 def test_db_query_msg_id(self):
"""ensure msg_id is always in db queries"""
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
MinRK
dictdb queries should [shallow] copy records...
r6323 def test_db_query_get_result(self):
"""pop in db_query shouldn't pop from result itself"""
self.client[:].apply_sync(lambda : 1)
found = self.client.db_query({'msg_id': {'$ne' : ''}})
rc2 = clientmod.Client(profile='iptest')
# If this bug is not fixed, this call will hang:
ar = rc2.get_result(self.client.history[-1])
ar.wait(2)
self.assertTrue(ar.ready())
ar.get()
rc2.close()
MinRK
add db_query and hub_history tests to test_client
r3782 def test_db_query_in(self):
"""test db query with '$in','$nin' operators"""
hist = self.client.hub_history()
even = hist[::2]
odd = hist[1::2]
recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
found = [ r['msg_id'] for r in recs ]
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(set(even), set(found))
MinRK
add db_query and hub_history tests to test_client
r3782 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
found = [ r['msg_id'] for r in recs ]
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(set(odd), set(found))
MinRK
add db_query and hub_history tests to test_client
r3782
def test_hub_history(self):
hist = self.client.hub_history()
recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
recdict = {}
for rec in recs:
recdict[rec['msg_id']] = rec
latest = datetime(1984,1,1)
for msg_id in hist:
rec = recdict[msg_id]
newt = rec['submitted']
self.assertTrue(newt >= latest)
latest = newt
ar = self.client[-1].apply_async(lambda : 1)
ar.get()
time.sleep(0.25)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(self.client.hub_history()[-1:],ar.msg_ids)
MinRK
add db_query and hub_history tests to test_client
r3782
MinRK
test resubmit of aborted tasks (#1647)
r6808 def _wait_for_idle(self):
MinRK
wait for empty queues as well as tasks...
r13530 """wait for the cluster to become idle, according to the everyone."""
MinRK
test resubmit of aborted tasks (#1647)
r6808 rc = self.client
MinRK
wait for empty queues as well as tasks...
r13530 # step 0. wait for local results
# this should be sufficient 99% of the time.
rc.wait(timeout=5)
MinRK
improve patience for slow Hub in client tests...
r9164 # step 1. wait for all requests to be noticed
# timeout 5s, polling every 100ms
msg_ids = set(rc.history)
hub_hist = rc.hub_history()
for i in range(50):
if msg_ids.difference(hub_hist):
time.sleep(0.1)
hub_hist = rc.hub_history()
else:
break
self.assertEqual(len(msg_ids.difference(hub_hist)), 0)
# step 2. wait for all requests to be done
MinRK
minor tweak to wait_for_idle in tests
r7500 # timeout 5s, polling every 100ms
qs = rc.queue_status()
for i in range(50):
MinRK
don't use lazily-evaluated rc.ids in wait_for_idle...
r13673 if qs['unassigned'] or any(qs[eid]['tasks'] + qs[eid]['queue'] for eid in qs if eid != 'unassigned'):
MinRK
test resubmit of aborted tasks (#1647)
r6808 time.sleep(0.1)
MinRK
minor tweak to wait_for_idle in tests
r7500 qs = rc.queue_status()
MinRK
test resubmit of aborted tasks (#1647)
r6808 else:
break
# ensure Hub up to date:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(qs['unassigned'], 0)
MinRK
fix typo in PR #4744
r13974 for eid in [ eid for eid in qs if eid != 'unassigned' ]:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(qs[eid]['tasks'], 0)
MinRK
wait for empty queues as well as tasks...
r13530 self.assertEqual(qs[eid]['queue'], 0)
MinRK
test resubmit of aborted tasks (#1647)
r6808
MinRK
add Client.resubmit for re-running tasks...
r3874 def test_resubmit(self):
def f():
import random
return random.random()
v = self.client.load_balanced_view()
ar = v.apply_async(f)
r1 = ar.get(1)
MinRK
improve reliability of occasionally failing parallel tests
r4469 # give the Hub a chance to notice:
MinRK
test resubmit of aborted tasks (#1647)
r6808 self._wait_for_idle()
MinRK
add Client.resubmit for re-running tasks...
r3874 ahr = self.client.resubmit(ar.msg_ids)
r2 = ahr.get(1)
self.assertFalse(r1 == r2)
MinRK
test chained resubmissions
r7295 def test_resubmit_chain(self):
"""resubmit resubmitted tasks"""
v = self.client.load_balanced_view()
ar = v.apply_async(lambda x: x, 'x'*1024)
ar.get()
self._wait_for_idle()
ars = [ar]
for i in range(10):
ar = ars[-1]
ar2 = self.client.resubmit(ar.msg_ids)
[ ar.get() for ar in ars ]
MinRK
preserve header for resubmitted tasks...
r7262 def test_resubmit_header(self):
"""resubmit shouldn't clobber the whole header"""
def f():
import random
return random.random()
v = self.client.load_balanced_view()
v.retries = 1
ar = v.apply_async(f)
r1 = ar.get(1)
# give the Hub a chance to notice:
self._wait_for_idle()
ahr = self.client.resubmit(ar.msg_ids)
ahr.get(1)
time.sleep(0.5)
records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
h1,h2 = [ r['header'] for r in records ]
for key in set(h1.keys()).union(set(h2.keys())):
if key in ('msg_id', 'date'):
Bradley M. Froehle
s/assertNotEquals/assertNotEqual/
r7878 self.assertNotEqual(h1[key], h2[key])
MinRK
preserve header for resubmitted tasks...
r7262 else:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(h1[key], h2[key])
MinRK
preserve header for resubmitted tasks...
r7262
MinRK
test resubmit of aborted tasks (#1647)
r6808 def test_resubmit_aborted(self):
def f():
import random
return random.random()
v = self.client.load_balanced_view()
# restrict to one engine, so we can put a sleep
# ahead of the task, so it will get aborted
eid = self.client.ids[-1]
v.targets = [eid]
sleep = v.apply_async(time.sleep, 0.5)
ar = v.apply_async(f)
ar.abort()
self.assertRaises(error.TaskAborted, ar.get)
# Give the Hub a chance to get up to date:
self._wait_for_idle()
ahr = self.client.resubmit(ar.msg_ids)
r2 = ahr.get(1)
MinRK
add Client.resubmit for re-running tasks...
r3874 def test_resubmit_inflight(self):
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 """resubmit of inflight task"""
MinRK
add Client.resubmit for re-running tasks...
r3874 v = self.client.load_balanced_view()
ar = v.apply_async(time.sleep,1)
# give the message a chance to arrive
time.sleep(0.2)
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 ahr = self.client.resubmit(ar.msg_ids)
MinRK
add Client.resubmit for re-running tasks...
r3874 ar.get(2)
MinRK
resubmitted tasks are now wholly separate (new msg_ids)...
r6817 ahr.get(2)
MinRK
add Client.resubmit for re-running tasks...
r3874
def test_resubmit_badkey(self):
"""ensure KeyError on resubmit of nonexistant task"""
self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
MinRK
various db backend fixes...
r3875
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 def test_purge_hub_results(self):
MinRK
fix purge_results for args other than specified msg_id...
r4146 # ensure there are some tasks
for i in range(5):
self.client[:].apply_sync(lambda : 1)
MinRK
prevent race condition in purge_results test...
r4173 # Wait for the Hub to realise the result is done:
# This prevents a race condition, where we
# might purge a result the Hub still thinks is pending.
MinRK
improve patience for slow Hub in client tests...
r9164 self._wait_for_idle()
MinRK
prevent race condition in purge_results test...
r4173 rc2 = clientmod.Client(profile='iptest')
MinRK
various db backend fixes...
r3875 hist = self.client.hub_history()
MinRK
prevent race condition in purge_results test...
r4173 ahr = rc2.get_result([hist[-1]])
ahr.wait(10)
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.client.purge_hub_results(hist[-1])
MinRK
various db backend fixes...
r3875 newhist = self.client.hub_history()
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(len(newhist)+1,len(hist))
MinRK
make sure extra client in purge_results test is closed...
r4176 rc2.spin()
rc2.close()
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410
def test_purge_local_results(self):
# ensure there are some tasks
res = []
for i in range(5):
res.append(self.client[:].apply_async(lambda : 1))
MinRK
improve patience for slow Hub in client tests...
r9164 self._wait_for_idle()
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.client.wait(10) # wait for the results to come back
before = len(self.client.results)
self.assertEqual(len(self.client.metadata),before)
self.client.purge_local_results(res[-1])
self.assertEqual(len(self.client.results),before-len(res[-1]), msg="Not removed from results")
self.assertEqual(len(self.client.metadata),before-len(res[-1]), msg="Not removed from metadata")
MinRK
test purging local results while some are outstanding
r13918
def test_purge_local_results_outstanding(self):
v = self.client[-1]
ar = v.apply_async(lambda : 1)
msg_id = ar.msg_ids[0]
MinRK
add ownership to AsyncResult objects...
r16739 ar.owner = False
MinRK
test purging local results while some are outstanding
r13918 ar.get()
self._wait_for_idle()
ar2 = v.apply_async(time.sleep, 1)
self.assertIn(msg_id, self.client.results)
self.assertIn(msg_id, self.client.metadata)
self.client.purge_local_results(ar)
self.assertNotIn(msg_id, self.client.results)
self.assertNotIn(msg_id, self.client.metadata)
with self.assertRaises(RuntimeError):
self.client.purge_local_results(ar2)
ar2.get()
self.client.purge_local_results(ar2)
def test_purge_all_local_results_outstanding(self):
v = self.client[-1]
ar = v.apply_async(time.sleep, 1)
with self.assertRaises(RuntimeError):
self.client.purge_local_results('all')
ar.get()
self.client.purge_local_results('all')
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 def test_purge_all_hub_results(self):
self.client.purge_hub_results('all')
hist = self.client.hub_history()
self.assertEqual(len(hist), 0)
def test_purge_all_local_results(self):
self.client.purge_local_results('all')
self.assertEqual(len(self.client.results), 0, msg="Results not empty")
self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
MinRK
fix purge_results for args other than specified msg_id...
r4146 def test_purge_all_results(self):
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 # ensure there are some tasks
for i in range(5):
self.client[:].apply_sync(lambda : 1)
self.client.wait(10)
MinRK
improve patience for slow Hub in client tests...
r9164 self._wait_for_idle()
MinRK
fix purge_results for args other than specified msg_id...
r4146 self.client.purge_results('all')
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.assertEqual(len(self.client.results), 0, msg="Results not empty")
self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
MinRK
improve patience for slow Hub in client tests...
r9164 hist = self.client.hub_history()
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.assertEqual(len(hist), 0, msg="hub history not empty")
def test_purge_everything(self):
# ensure there are some tasks
for i in range(5):
self.client[:].apply_sync(lambda : 1)
self.client.wait(10)
MinRK
improve patience for slow Hub in client tests...
r9164 self._wait_for_idle()
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410 self.client.purge_everything()
# The client results
self.assertEqual(len(self.client.results), 0, msg="Results not empty")
self.assertEqual(len(self.client.metadata), 0, msg="metadata not empty")
# The client "bookkeeping"
self.assertEqual(len(self.client.session.digest_history), 0, msg="session digest not empty")
self.assertEqual(len(self.client.history), 0, msg="client history not empty")
MinRK
improve patience for slow Hub in client tests...
r9164 # the hub results
hist = self.client.hub_history()
self.assertEqual(len(hist), 0, msg="hub history not empty")
Jan Schulz
Add Testcases for new purging methods in parallel.Client...
r8410
MinRK
add Client.spin_thread()...
r6484
def test_spin_thread(self):
self.client.spin_thread(0.01)
ar = self.client[-1].apply_async(lambda : 1)
MinRK
make spin_thread tests more forgiving of slow VMs...
r16040 md = self.client.metadata[ar.msg_ids[0]]
# 3s timeout, 100ms poll
for i in range(30):
time.sleep(0.1)
if md['received'] is not None:
break
self.assertIsInstance(md['received'], datetime)
MinRK
add Client.spin_thread()...
r6484
def test_stop_spin_thread(self):
self.client.spin_thread(0.01)
self.client.stop_spin_thread()
ar = self.client[-1].apply_async(lambda : 1)
MinRK
make spin_thread tests more forgiving of slow VMs...
r16040 md = self.client.metadata[ar.msg_ids[0]]
# 500ms timeout, 100ms poll
for i in range(5):
time.sleep(0.1)
self.assertIsNone(md['received'], None)
MinRK
add Client.spin_thread()...
r6484
MinRK
update parallel magics...
r7476 def test_activate(self):
ip = get_ipython()
magics = ip.magics_manager.magics
self.assertTrue('px' in magics['line'])
self.assertTrue('px' in magics['cell'])
v0 = self.client.activate(-1, '0')
self.assertTrue('px0' in magics['line'])
self.assertTrue('px0' in magics['cell'])
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v0.targets, self.client.ids[-1])
MinRK
update parallel magics...
r7476 v0 = self.client.activate('all', 'all')
self.assertTrue('pxall' in magics['line'])
self.assertTrue('pxall' in magics['cell'])
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(v0.targets, 'all')