test_client.py
408 lines
| 14.3 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """Tests for parallel client.py | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3664 | |||
#------------------------------------------------------------------------------- | ||||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
MinRK
|
r4155 | from __future__ import division | ||
MinRK
|
r3595 | import time | ||
MinRK
|
r3782 | from datetime import datetime | ||
MinRK
|
r3650 | from tempfile import mktemp | ||
MinRK
|
r3595 | |||
MinRK
|
r3654 | import zmq | ||
MinRK
|
r3637 | |||
MinRK
|
r3673 | from IPython.parallel.client import client as clientmod | ||
MinRK
|
r3666 | from IPython.parallel import error | ||
MinRK
|
r3673 | from IPython.parallel import AsyncResult, AsyncHubResult | ||
from IPython.parallel import LoadBalancedView, DirectView | ||||
MinRK
|
r3595 | |||
MinRK
|
r3664 | from clienttest import ClusterTestCase, segfault, wait, add_engines | ||
def setup(): | ||||
MinRK
|
r6162 | add_engines(4, total=True) | ||
MinRK
|
r3595 | |||
class TestClient(ClusterTestCase): | ||||
def test_ids(self): | ||||
MinRK
|
r3641 | n = len(self.client.ids) | ||
MinRK
|
r6162 | self.add_engines(2) | ||
self.assertEquals(len(self.client.ids), n+2) | ||||
MinRK
|
r3595 | |||
def test_view_indexing(self): | ||||
MinRK
|
r3641 | """test index access for views""" | ||
MinRK
|
r6162 | self.minimum_engines(4) | ||
MinRK
|
r3595 | targets = self.client._build_targets('all')[-1] | ||
v = self.client[:] | ||||
self.assertEquals(v.targets, targets) | ||||
MinRK
|
r3637 | t = self.client.ids[2] | ||
v = self.client[t] | ||||
self.assert_(isinstance(v, DirectView)) | ||||
self.assertEquals(v.targets, t) | ||||
t = self.client.ids[2:4] | ||||
v = self.client[t] | ||||
self.assert_(isinstance(v, DirectView)) | ||||
self.assertEquals(v.targets, t) | ||||
v = self.client[::2] | ||||
self.assert_(isinstance(v, DirectView)) | ||||
MinRK
|
r3595 | self.assertEquals(v.targets, targets[::2]) | ||
MinRK
|
r3637 | v = self.client[1::3] | ||
self.assert_(isinstance(v, DirectView)) | ||||
MinRK
|
r3595 | self.assertEquals(v.targets, targets[1::3]) | ||
MinRK
|
r3637 | v = self.client[:-3] | ||
self.assert_(isinstance(v, DirectView)) | ||||
MinRK
|
r3595 | self.assertEquals(v.targets, targets[:-3]) | ||
MinRK
|
r3638 | v = self.client[-1] | ||
self.assert_(isinstance(v, DirectView)) | ||||
self.assertEquals(v.targets, targets[-1]) | ||||
MinRK
|
r3661 | self.assertRaises(TypeError, lambda : self.client[None]) | ||
MinRK
|
r3637 | |||
MinRK
|
r3666 | def test_lbview_targets(self): | ||
"""test load_balanced_view targets""" | ||||
v = self.client.load_balanced_view() | ||||
self.assertEquals(v.targets, None) | ||||
v = self.client.load_balanced_view(-1) | ||||
self.assertEquals(v.targets, [self.client.ids[-1]]) | ||||
v = self.client.load_balanced_view('all') | ||||
MinRK
|
r4492 | self.assertEquals(v.targets, None) | ||
def test_dview_targets(self): | ||||
MinRK
|
r5290 | """test direct_view targets""" | ||
MinRK
|
r4492 | v = self.client.direct_view() | ||
self.assertEquals(v.targets, 'all') | ||||
v = self.client.direct_view('all') | ||||
self.assertEquals(v.targets, 'all') | ||||
v = self.client.direct_view(-1) | ||||
self.assertEquals(v.targets, self.client.ids[-1]) | ||||
MinRK
|
r3666 | |||
MinRK
|
r5290 | def test_lazy_all_targets(self): | ||
"""test lazy evaluation of rc.direct_view('all')""" | ||||
v = self.client.direct_view() | ||||
self.assertEquals(v.targets, 'all') | ||||
def double(x): | ||||
return x*2 | ||||
seq = range(100) | ||||
ref = [ double(x) for x in seq ] | ||||
# add some engines, which should be used | ||||
MinRK
|
r6162 | self.add_engines(1) | ||
MinRK
|
r5290 | n1 = len(self.client.ids) | ||
# simple apply | ||||
r = v.apply_sync(lambda : 1) | ||||
self.assertEquals(r, [1] * n1) | ||||
# map goes through remotefunction | ||||
r = v.map_sync(double, seq) | ||||
self.assertEquals(r, ref) | ||||
# add a couple more engines, and try again | ||||
self.add_engines(2) | ||||
n2 = len(self.client.ids) | ||||
self.assertNotEquals(n2, n1) | ||||
# apply | ||||
r = v.apply_sync(lambda : 1) | ||||
self.assertEquals(r, [1] * n2) | ||||
# map | ||||
r = v.map_sync(double, seq) | ||||
self.assertEquals(r, ref) | ||||
MinRK
|
r3637 | def test_targets(self): | ||
"""test various valid targets arguments""" | ||||
MinRK
|
r3641 | build = self.client._build_targets | ||
ids = self.client.ids | ||||
idents,targets = build(None) | ||||
self.assertEquals(ids, targets) | ||||
MinRK
|
r3637 | |||
def test_clear(self): | ||||
"""test clear behavior""" | ||||
MinRK
|
r6162 | self.minimum_engines(2) | ||
MinRK
|
r3664 | v = self.client[:] | ||
v.block=True | ||||
v.push(dict(a=5)) | ||||
v.pull('a') | ||||
MinRK
|
r3641 | id0 = self.client.ids[-1] | ||
MinRK
|
r3775 | self.client.clear(targets=id0, block=True) | ||
a = self.client[:-1].get('a') | ||||
MinRK
|
r3664 | self.assertRaisesRemote(NameError, self.client[id0].get, 'a') | ||
self.client.clear(block=True) | ||||
MinRK
|
r3641 | for i in self.client.ids: | ||
MinRK
|
r3664 | self.assertRaisesRemote(NameError, self.client[i].get, 'a') | ||
MinRK
|
r3637 | |||
MinRK
|
r3641 | def test_get_result(self): | ||
"""test getting results from the Hub.""" | ||||
c = clientmod.Client(profile='iptest') | ||||
MinRK
|
r3661 | t = c.ids[-1] | ||
MinRK
|
r3664 | ar = c[t].apply_async(wait, 1) | ||
MinRK
|
r3658 | # give the monitor time to notice the message | ||
MinRK
|
r3641 | time.sleep(.25) | ||
ahr = self.client.get_result(ar.msg_ids) | ||||
self.assertTrue(isinstance(ahr, AsyncHubResult)) | ||||
self.assertEquals(ahr.get(), ar.get()) | ||||
ar2 = self.client.get_result(ar.msg_ids) | ||||
self.assertFalse(isinstance(ar2, AsyncHubResult)) | ||||
MinRK
|
r3664 | c.close() | ||
MinRK
|
r3650 | |||
def test_ids_list(self): | ||||
"""test client.ids""" | ||||
ids = self.client.ids | ||||
self.assertEquals(ids, self.client._ids) | ||||
self.assertFalse(ids is self.client._ids) | ||||
ids.remove(ids[-1]) | ||||
self.assertNotEquals(ids, self.client._ids) | ||||
MinRK
|
r3664 | def test_queue_status(self): | ||
ids = self.client.ids | ||||
id0 = ids[0] | ||||
qs = self.client.queue_status(targets=id0) | ||||
self.assertTrue(isinstance(qs, dict)) | ||||
self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) | ||||
allqs = self.client.queue_status() | ||||
self.assertTrue(isinstance(allqs, dict)) | ||||
MinRK
|
r4155 | intkeys = list(allqs.keys()) | ||
intkeys.remove('unassigned') | ||||
self.assertEquals(sorted(intkeys), sorted(self.client.ids)) | ||||
MinRK
|
r3701 | unassigned = allqs.pop('unassigned') | ||
MinRK
|
r3664 | for eid,qs in allqs.items(): | ||
self.assertTrue(isinstance(qs, dict)) | ||||
self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks']) | ||||
MinRK
|
r3650 | |||
MinRK
|
r3664 | def test_shutdown(self): | ||
ids = self.client.ids | ||||
id0 = ids[0] | ||||
self.client.shutdown(id0, block=True) | ||||
while id0 in self.client.ids: | ||||
time.sleep(0.1) | ||||
self.client.spin() | ||||
MinRK
|
r3654 | |||
MinRK
|
r3664 | self.assertRaises(IndexError, lambda : self.client[id0]) | ||
MinRK
|
r3654 | |||
MinRK
|
r3664 | def test_result_status(self): | ||
pass | ||||
# to be written | ||||
MinRK
|
r3782 | |||
def test_db_query_dt(self): | ||||
"""test db query by date""" | ||||
hist = self.client.hub_history() | ||||
MinRK
|
r4155 | middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0] | ||
MinRK
|
r3782 | tic = middle['submitted'] | ||
before = self.client.db_query({'submitted' : {'$lt' : tic}}) | ||||
after = self.client.db_query({'submitted' : {'$gte' : tic}}) | ||||
self.assertEquals(len(before)+len(after),len(hist)) | ||||
for b in before: | ||||
self.assertTrue(b['submitted'] < tic) | ||||
for a in after: | ||||
self.assertTrue(a['submitted'] >= tic) | ||||
same = self.client.db_query({'submitted' : tic}) | ||||
for s in same: | ||||
self.assertTrue(s['submitted'] == tic) | ||||
def test_db_query_keys(self): | ||||
"""test extracting subset of record keys""" | ||||
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed']) | ||||
for rec in found: | ||||
self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed'])) | ||||
MinRK
|
r6094 | def test_db_query_default_keys(self): | ||
"""default db_query excludes buffers""" | ||||
found = self.client.db_query({'msg_id': {'$ne' : ''}}) | ||||
for rec in found: | ||||
keys = set(rec.keys()) | ||||
self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys) | ||||
self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys) | ||||
MinRK
|
r3782 | def test_db_query_msg_id(self): | ||
"""ensure msg_id is always in db queries""" | ||||
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed']) | ||||
for rec in found: | ||||
self.assertTrue('msg_id' in rec.keys()) | ||||
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted']) | ||||
for rec in found: | ||||
self.assertTrue('msg_id' in rec.keys()) | ||||
found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id']) | ||||
for rec in found: | ||||
self.assertTrue('msg_id' in rec.keys()) | ||||
MinRK
|
r6323 | def test_db_query_get_result(self): | ||
"""pop in db_query shouldn't pop from result itself""" | ||||
self.client[:].apply_sync(lambda : 1) | ||||
found = self.client.db_query({'msg_id': {'$ne' : ''}}) | ||||
rc2 = clientmod.Client(profile='iptest') | ||||
# If this bug is not fixed, this call will hang: | ||||
ar = rc2.get_result(self.client.history[-1]) | ||||
ar.wait(2) | ||||
self.assertTrue(ar.ready()) | ||||
ar.get() | ||||
rc2.close() | ||||
MinRK
|
r3782 | def test_db_query_in(self): | ||
"""test db query with '$in','$nin' operators""" | ||||
hist = self.client.hub_history() | ||||
even = hist[::2] | ||||
odd = hist[1::2] | ||||
recs = self.client.db_query({ 'msg_id' : {'$in' : even}}) | ||||
found = [ r['msg_id'] for r in recs ] | ||||
self.assertEquals(set(even), set(found)) | ||||
recs = self.client.db_query({ 'msg_id' : {'$nin' : even}}) | ||||
found = [ r['msg_id'] for r in recs ] | ||||
self.assertEquals(set(odd), set(found)) | ||||
def test_hub_history(self): | ||||
hist = self.client.hub_history() | ||||
recs = self.client.db_query({ 'msg_id' : {"$ne":''}}) | ||||
recdict = {} | ||||
for rec in recs: | ||||
recdict[rec['msg_id']] = rec | ||||
latest = datetime(1984,1,1) | ||||
for msg_id in hist: | ||||
rec = recdict[msg_id] | ||||
newt = rec['submitted'] | ||||
self.assertTrue(newt >= latest) | ||||
latest = newt | ||||
ar = self.client[-1].apply_async(lambda : 1) | ||||
ar.get() | ||||
time.sleep(0.25) | ||||
self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids) | ||||
MinRK
|
r6808 | def _wait_for_idle(self): | ||
"""wait for an engine to become idle, according to the Hub""" | ||||
rc = self.client | ||||
# timeout 2s, polling every 100ms | ||||
for i in range(20): | ||||
qs = rc.queue_status() | ||||
if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids): | ||||
time.sleep(0.1) | ||||
else: | ||||
break | ||||
# ensure Hub up to date: | ||||
qs = rc.queue_status() | ||||
self.assertEquals(qs['unassigned'], 0) | ||||
for eid in rc.ids: | ||||
self.assertEquals(qs[eid]['tasks'], 0) | ||||
MinRK
|
r3874 | def test_resubmit(self): | ||
def f(): | ||||
import random | ||||
return random.random() | ||||
v = self.client.load_balanced_view() | ||||
ar = v.apply_async(f) | ||||
r1 = ar.get(1) | ||||
MinRK
|
r4469 | # give the Hub a chance to notice: | ||
MinRK
|
r6808 | self._wait_for_idle() | ||
MinRK
|
r3874 | ahr = self.client.resubmit(ar.msg_ids) | ||
r2 = ahr.get(1) | ||||
self.assertFalse(r1 == r2) | ||||
MinRK
|
r7262 | def test_resubmit_header(self): | ||
"""resubmit shouldn't clobber the whole header""" | ||||
def f(): | ||||
import random | ||||
return random.random() | ||||
v = self.client.load_balanced_view() | ||||
v.retries = 1 | ||||
ar = v.apply_async(f) | ||||
r1 = ar.get(1) | ||||
# give the Hub a chance to notice: | ||||
self._wait_for_idle() | ||||
ahr = self.client.resubmit(ar.msg_ids) | ||||
ahr.get(1) | ||||
time.sleep(0.5) | ||||
records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header') | ||||
h1,h2 = [ r['header'] for r in records ] | ||||
for key in set(h1.keys()).union(set(h2.keys())): | ||||
if key in ('msg_id', 'date'): | ||||
self.assertNotEquals(h1[key], h2[key]) | ||||
else: | ||||
self.assertEquals(h1[key], h2[key]) | ||||
MinRK
|
r6808 | def test_resubmit_aborted(self): | ||
def f(): | ||||
import random | ||||
return random.random() | ||||
v = self.client.load_balanced_view() | ||||
# restrict to one engine, so we can put a sleep | ||||
# ahead of the task, so it will get aborted | ||||
eid = self.client.ids[-1] | ||||
v.targets = [eid] | ||||
sleep = v.apply_async(time.sleep, 0.5) | ||||
ar = v.apply_async(f) | ||||
ar.abort() | ||||
self.assertRaises(error.TaskAborted, ar.get) | ||||
# Give the Hub a chance to get up to date: | ||||
self._wait_for_idle() | ||||
ahr = self.client.resubmit(ar.msg_ids) | ||||
r2 = ahr.get(1) | ||||
MinRK
|
r3874 | def test_resubmit_inflight(self): | ||
MinRK
|
r6817 | """resubmit of inflight task""" | ||
MinRK
|
r3874 | v = self.client.load_balanced_view() | ||
ar = v.apply_async(time.sleep,1) | ||||
# give the message a chance to arrive | ||||
time.sleep(0.2) | ||||
MinRK
|
r6817 | ahr = self.client.resubmit(ar.msg_ids) | ||
MinRK
|
r3874 | ar.get(2) | ||
MinRK
|
r6817 | ahr.get(2) | ||
MinRK
|
r3874 | |||
def test_resubmit_badkey(self): | ||||
"""ensure KeyError on resubmit of nonexistant task""" | ||||
self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid']) | ||||
MinRK
|
r3875 | |||
def test_purge_results(self): | ||||
MinRK
|
r4146 | # ensure there are some tasks | ||
for i in range(5): | ||||
self.client[:].apply_sync(lambda : 1) | ||||
MinRK
|
r4173 | # Wait for the Hub to realise the result is done: | ||
# This prevents a race condition, where we | ||||
# might purge a result the Hub still thinks is pending. | ||||
time.sleep(0.1) | ||||
rc2 = clientmod.Client(profile='iptest') | ||||
MinRK
|
r3875 | hist = self.client.hub_history() | ||
MinRK
|
r4173 | ahr = rc2.get_result([hist[-1]]) | ||
ahr.wait(10) | ||||
MinRK
|
r4146 | self.client.purge_results(hist[-1]) | ||
MinRK
|
r3875 | newhist = self.client.hub_history() | ||
MinRK
|
r4146 | self.assertEquals(len(newhist)+1,len(hist)) | ||
MinRK
|
r4176 | rc2.spin() | ||
rc2.close() | ||||
MinRK
|
r4146 | |||
def test_purge_all_results(self): | ||||
self.client.purge_results('all') | ||||
hist = self.client.hub_history() | ||||
self.assertEquals(len(hist), 0) | ||||
MinRK
|
r6484 | |||
def test_spin_thread(self): | ||||
self.client.spin_thread(0.01) | ||||
ar = self.client[-1].apply_async(lambda : 1) | ||||
time.sleep(0.1) | ||||
self.assertTrue(ar.wall_time < 0.1, | ||||
"spin should have kept wall_time < 0.1, but got %f" % ar.wall_time | ||||
) | ||||
def test_stop_spin_thread(self): | ||||
self.client.spin_thread(0.01) | ||||
self.client.stop_spin_thread() | ||||
ar = self.client[-1].apply_async(lambda : 1) | ||||
time.sleep(0.15) | ||||
self.assertTrue(ar.wall_time > 0.1, | ||||
"Shouldn't be spinning, but got wall_time=%f" % ar.wall_time | ||||
) | ||||