##// END OF EJS Templates
handle datetime objects in Session...
handle datetime objects in Session also validate packer/unpacker

File last commit:

r3875:ffe043d4
r4008:bdc1136a
Show More
test_client.py
244 lines | 8.9 KiB | text/x-python | PythonLexer
MinRK
update API after sagedays29...
r3664 """Tests for parallel client.py"""
#-------------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
MinRK
added preliminary tests for zmq.parallel
r3595 import time
MinRK
add db_query and hub_history tests to test_client
r3782 from datetime import datetime
MinRK
fix small client bugs + tests
r3650 from tempfile import mktemp
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
add message tracking to client, add/improve tests
r3654 import zmq
MinRK
some initial tests for newparallel
r3637
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.client import client as clientmod
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 from IPython.parallel import error
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel import AsyncResult, AsyncHubResult
from IPython.parallel import LoadBalancedView, DirectView
MinRK
added preliminary tests for zmq.parallel
r3595
MinRK
update API after sagedays29...
r3664 from clienttest import ClusterTestCase, segfault, wait, add_engines
def setup():
add_engines(4)
MinRK
added preliminary tests for zmq.parallel
r3595
class TestClient(ClusterTestCase):
def test_ids(self):
MinRK
testing fixes
r3641 n = len(self.client.ids)
MinRK
added preliminary tests for zmq.parallel
r3595 self.add_engines(3)
MinRK
testing fixes
r3641 self.assertEquals(len(self.client.ids), n+3)
MinRK
added preliminary tests for zmq.parallel
r3595
def test_view_indexing(self):
MinRK
testing fixes
r3641 """test index access for views"""
self.add_engines(2)
MinRK
added preliminary tests for zmq.parallel
r3595 targets = self.client._build_targets('all')[-1]
v = self.client[:]
self.assertEquals(v.targets, targets)
MinRK
some initial tests for newparallel
r3637 t = self.client.ids[2]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
t = self.client.ids[2:4]
v = self.client[t]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, t)
v = self.client[::2]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[::2])
MinRK
some initial tests for newparallel
r3637 v = self.client[1::3]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[1::3])
MinRK
some initial tests for newparallel
r3637 v = self.client[:-3]
self.assert_(isinstance(v, DirectView))
MinRK
added preliminary tests for zmq.parallel
r3595 self.assertEquals(v.targets, targets[:-3])
MinRK
fix/test pushed function globals
r3638 v = self.client[-1]
self.assert_(isinstance(v, DirectView))
self.assertEquals(v.targets, targets[-1])
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 self.assertRaises(TypeError, lambda : self.client[None])
MinRK
some initial tests for newparallel
r3637
MinRK
move IPython.zmq.parallel to IPython.parallel
r3666 def test_lbview_targets(self):
"""test load_balanced_view targets"""
v = self.client.load_balanced_view()
self.assertEquals(v.targets, None)
v = self.client.load_balanced_view(-1)
self.assertEquals(v.targets, [self.client.ids[-1]])
v = self.client.load_balanced_view('all')
self.assertEquals(v.targets, self.client.ids)
MinRK
some initial tests for newparallel
r3637 def test_targets(self):
"""test various valid targets arguments"""
MinRK
testing fixes
r3641 build = self.client._build_targets
ids = self.client.ids
idents,targets = build(None)
self.assertEquals(ids, targets)
MinRK
some initial tests for newparallel
r3637
def test_clear(self):
"""test clear behavior"""
MinRK
update API after sagedays29...
r3664 # self.add_engines(2)
v = self.client[:]
v.block=True
v.push(dict(a=5))
v.pull('a')
MinRK
testing fixes
r3641 id0 = self.client.ids[-1]
MinRK
Start each test with clear engine namespaces...
r3775 self.client.clear(targets=id0, block=True)
a = self.client[:-1].get('a')
MinRK
update API after sagedays29...
r3664 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
self.client.clear(block=True)
MinRK
testing fixes
r3641 for i in self.client.ids:
MinRK
update API after sagedays29...
r3664 # print i
self.assertRaisesRemote(NameError, self.client[i].get, 'a')
MinRK
some initial tests for newparallel
r3637
MinRK
testing fixes
r3641 def test_get_result(self):
"""test getting results from the Hub."""
c = clientmod.Client(profile='iptest')
MinRK
update API after sagedays29...
r3664 # self.add_engines(1)
MinRK
pyzmq-2.1.3 related testing adjustments
r3661 t = c.ids[-1]
MinRK
update API after sagedays29...
r3664 ar = c[t].apply_async(wait, 1)
MinRK
update connections and diagrams for reduced sockets
r3658 # give the monitor time to notice the message
MinRK
testing fixes
r3641 time.sleep(.25)
ahr = self.client.get_result(ar.msg_ids)
self.assertTrue(isinstance(ahr, AsyncHubResult))
self.assertEquals(ahr.get(), ar.get())
ar2 = self.client.get_result(ar.msg_ids)
self.assertFalse(isinstance(ar2, AsyncHubResult))
MinRK
update API after sagedays29...
r3664 c.close()
MinRK
fix small client bugs + tests
r3650
def test_ids_list(self):
"""test client.ids"""
MinRK
update API after sagedays29...
r3664 # self.add_engines(2)
MinRK
fix small client bugs + tests
r3650 ids = self.client.ids
self.assertEquals(ids, self.client._ids)
self.assertFalse(ids is self.client._ids)
ids.remove(ids[-1])
self.assertNotEquals(ids, self.client._ids)
MinRK
update API after sagedays29...
r3664 def test_queue_status(self):
# self.addEngine(4)
ids = self.client.ids
id0 = ids[0]
qs = self.client.queue_status(targets=id0)
self.assertTrue(isinstance(qs, dict))
self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
allqs = self.client.queue_status()
self.assertTrue(isinstance(allqs, dict))
MinRK
fix out of sync parallel tests...
r3701 self.assertEquals(sorted(allqs.keys()), sorted(self.client.ids + ['unassigned']))
unassigned = allqs.pop('unassigned')
MinRK
update API after sagedays29...
r3664 for eid,qs in allqs.items():
self.assertTrue(isinstance(qs, dict))
self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
MinRK
fix small client bugs + tests
r3650
MinRK
update API after sagedays29...
r3664 def test_shutdown(self):
# self.addEngine(4)
ids = self.client.ids
id0 = ids[0]
self.client.shutdown(id0, block=True)
while id0 in self.client.ids:
time.sleep(0.1)
self.client.spin()
MinRK
add message tracking to client, add/improve tests
r3654
MinRK
update API after sagedays29...
r3664 self.assertRaises(IndexError, lambda : self.client[id0])
MinRK
add message tracking to client, add/improve tests
r3654
MinRK
update API after sagedays29...
r3664 def test_result_status(self):
pass
# to be written
MinRK
add db_query and hub_history tests to test_client
r3782
def test_db_query_dt(self):
"""test db query by date"""
hist = self.client.hub_history()
middle = self.client.db_query({'msg_id' : hist[len(hist)/2]})[0]
tic = middle['submitted']
before = self.client.db_query({'submitted' : {'$lt' : tic}})
after = self.client.db_query({'submitted' : {'$gte' : tic}})
self.assertEquals(len(before)+len(after),len(hist))
for b in before:
self.assertTrue(b['submitted'] < tic)
for a in after:
self.assertTrue(a['submitted'] >= tic)
same = self.client.db_query({'submitted' : tic})
for s in same:
self.assertTrue(s['submitted'] == tic)
def test_db_query_keys(self):
"""test extracting subset of record keys"""
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
for rec in found:
self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
def test_db_query_msg_id(self):
"""ensure msg_id is always in db queries"""
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
for rec in found:
self.assertTrue('msg_id' in rec.keys())
def test_db_query_in(self):
"""test db query with '$in','$nin' operators"""
hist = self.client.hub_history()
even = hist[::2]
odd = hist[1::2]
recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
found = [ r['msg_id'] for r in recs ]
self.assertEquals(set(even), set(found))
recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
found = [ r['msg_id'] for r in recs ]
self.assertEquals(set(odd), set(found))
def test_hub_history(self):
hist = self.client.hub_history()
recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
recdict = {}
for rec in recs:
recdict[rec['msg_id']] = rec
latest = datetime(1984,1,1)
for msg_id in hist:
rec = recdict[msg_id]
newt = rec['submitted']
self.assertTrue(newt >= latest)
latest = newt
ar = self.client[-1].apply_async(lambda : 1)
ar.get()
time.sleep(0.25)
self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
MinRK
add Client.resubmit for re-running tasks...
r3874 def test_resubmit(self):
def f():
import random
return random.random()
v = self.client.load_balanced_view()
ar = v.apply_async(f)
r1 = ar.get(1)
ahr = self.client.resubmit(ar.msg_ids)
r2 = ahr.get(1)
self.assertFalse(r1 == r2)
def test_resubmit_inflight(self):
"""ensure ValueError on resubmit of inflight task"""
v = self.client.load_balanced_view()
ar = v.apply_async(time.sleep,1)
# give the message a chance to arrive
time.sleep(0.2)
self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
ar.get(2)
def test_resubmit_badkey(self):
"""ensure KeyError on resubmit of nonexistant task"""
self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
MinRK
various db backend fixes...
r3875
def test_purge_results(self):
hist = self.client.hub_history()
self.client.purge_results(hist)
newhist = self.client.hub_history()
self.assertTrue(len(newhist) == 0)