test_lbview.py
221 lines
| 6.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4034 | # -*- coding: utf-8 -*- | ||
MinRK
|
r4018 | """test LoadBalancedView objects | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3873 | #------------------------------------------------------------------------------- | ||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import sys | ||||
import time | ||||
import zmq | ||||
MinRK
|
r4580 | from nose import SkipTest | ||
MinRK
|
r8200 | from nose.plugins.attrib import attr | ||
MinRK
|
r3873 | |||
from IPython import parallel as pmod | ||||
from IPython.parallel import error | ||||
from IPython.parallel.tests import add_engines | ||||
from .clienttest import ClusterTestCase, crash, wait, skip_without | ||||
def setup(): | ||||
MinRK
|
r6162 | add_engines(3, total=True) | ||
MinRK
|
r3873 | |||
class TestLoadBalancedView(ClusterTestCase): | ||||
def setUp(self): | ||||
ClusterTestCase.setUp(self) | ||||
self.view = self.client.load_balanced_view() | ||||
MinRK
|
r8200 | @attr('crash') | ||
MinRK
|
r3873 | def test_z_crash_task(self): | ||
"""test graceful handling of engine death (balanced)""" | ||||
# self.add_engines(1) | ||||
ar = self.view.apply_async(crash) | ||||
MinRK
|
r3874 | self.assertRaisesRemote(error.EngineError, ar.get, 10) | ||
MinRK
|
r3873 | eid = ar.engine_id | ||
tic = time.time() | ||||
while eid in self.client.ids and time.time()-tic < 5: | ||||
time.sleep(.01) | ||||
self.client.spin() | ||||
self.assertFalse(eid in self.client.ids, "Engine should have died") | ||||
def test_map(self): | ||||
def f(x): | ||||
return x**2 | ||||
data = range(16) | ||||
r = self.view.map_sync(f, data) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, map(f, data)) | ||
MinRK
|
r10571 | |||
def test_map_generator(self): | ||||
def f(x): | ||||
return x**2 | ||||
data = range(16) | ||||
r = self.view.map_sync(f, iter(data)) | ||||
self.assertEqual(r, map(f, iter(data))) | ||||
MinRK
|
r10569 | def test_map_short_first(self): | ||
def f(x,y): | ||||
if y is None: | ||||
return y | ||||
if x is None: | ||||
return x | ||||
return x*y | ||||
data = range(10) | ||||
data2 = range(4) | ||||
r = self.view.map_sync(f, data, data2) | ||||
self.assertEqual(r, map(f, data, data2)) | ||||
def test_map_short_last(self): | ||||
def f(x,y): | ||||
if y is None: | ||||
return y | ||||
if x is None: | ||||
return x | ||||
return x*y | ||||
data = range(4) | ||||
data2 = range(10) | ||||
r = self.view.map_sync(f, data, data2) | ||||
self.assertEqual(r, map(f, data, data2)) | ||||
MinRK
|
r5171 | def test_map_unordered(self): | ||
def f(x): | ||||
return x**2 | ||||
def slow_f(x): | ||||
import time | ||||
time.sleep(0.05*x) | ||||
return x**2 | ||||
data = range(16,0,-1) | ||||
reference = map(f, data) | ||||
MinRK
|
r5222 | amr = self.view.map_async(slow_f, data, ordered=False) | ||
MinRK
|
r5171 | self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) | ||
MinRK
|
r5222 | # check individual elements, retrieved as they come | ||
# list comprehension uses __iter__ | ||||
astheycame = [ r for r in amr ] | ||||
MinRK
|
r5171 | # Ensure that at least one result came out of order: | ||
Bradley M. Froehle
|
r7878 | self.assertNotEqual(astheycame, reference, "should not have preserved order") | ||
Bradley M. Froehle
|
r7874 | self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted") | ||
MinRK
|
r5171 | |||
def test_map_ordered(self): | ||||
def f(x): | ||||
return x**2 | ||||
def slow_f(x): | ||||
import time | ||||
time.sleep(0.05*x) | ||||
return x**2 | ||||
data = range(16,0,-1) | ||||
reference = map(f, data) | ||||
MinRK
|
r5222 | amr = self.view.map_async(slow_f, data) | ||
MinRK
|
r5171 | self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) | ||
MinRK
|
r5222 | # check individual elements, retrieved as they come | ||
# list(amr) uses __iter__ | ||||
MinRK
|
r5171 | astheycame = list(amr) | ||
# Ensure that results came in order | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(astheycame, reference) | ||
self.assertEqual(amr.result, reference) | ||||
MinRK
|
r5560 | |||
def test_map_iterable(self): | ||||
"""test map on iterables (balanced)""" | ||||
view = self.view | ||||
# 101 is prime, so it won't be evenly distributed | ||||
arr = range(101) | ||||
# so that it will be an iterator, even in Python 3 | ||||
it = iter(arr) | ||||
r = view.map_sync(lambda x:x, arr) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, list(arr)) | ||
MinRK
|
r5560 | |||
MinRK
|
r5171 | |||
MinRK
|
r3873 | def test_abort(self): | ||
view = self.view | ||||
ar = self.client[:].apply_async(time.sleep, .5) | ||||
MinRK
|
r4469 | ar = self.client[:].apply_async(time.sleep, .5) | ||
MinRK
|
r4580 | time.sleep(0.2) | ||
MinRK
|
r3873 | ar2 = view.apply_async(lambda : 2) | ||
ar3 = view.apply_async(lambda : 3) | ||||
view.abort(ar2) | ||||
view.abort(ar3.msg_ids) | ||||
self.assertRaises(error.TaskAborted, ar2.get) | ||||
self.assertRaises(error.TaskAborted, ar3.get) | ||||
def test_retries(self): | ||||
MinRK
|
r12788 | self.minimum_engines(3) | ||
MinRK
|
r3873 | view = self.view | ||
def fail(): | ||||
assert False | ||||
for r in range(len(self.client)-1): | ||||
with view.temp_flags(retries=r): | ||||
self.assertRaisesRemote(AssertionError, view.apply_sync, fail) | ||||
MinRK
|
r12786 | with view.temp_flags(retries=len(self.client), timeout=0.1): | ||
MinRK
|
r3873 | self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail) | ||
MinRK
|
r12786 | def test_short_timeout(self): | ||
MinRK
|
r12788 | self.minimum_engines(2) | ||
MinRK
|
r12786 | view = self.view | ||
def fail(): | ||||
import time | ||||
time.sleep(0.25) | ||||
assert False | ||||
with view.temp_flags(retries=1, timeout=0.01): | ||||
self.assertRaisesRemote(AssertionError, view.apply_sync, fail) | ||||
MinRK
|
r3873 | def test_invalid_dependency(self): | ||
view = self.view | ||||
with view.temp_flags(after='12345'): | ||||
self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1) | ||||
def test_impossible_dependency(self): | ||||
MinRK
|
r6162 | self.minimum_engines(2) | ||
MinRK
|
r3873 | view = self.client.load_balanced_view() | ||
ar1 = view.apply_async(lambda : 1) | ||||
ar1.get() | ||||
e1 = ar1.engine_id | ||||
e2 = e1 | ||||
while e2 == e1: | ||||
ar2 = view.apply_async(lambda : 1) | ||||
ar2.get() | ||||
e2 = ar2.engine_id | ||||
with view.temp_flags(follow=[ar1, ar2]): | ||||
self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1) | ||||
def test_follow(self): | ||||
ar = self.view.apply_async(lambda : 1) | ||||
ar.get() | ||||
ars = [] | ||||
first_id = ar.engine_id | ||||
self.view.follow = ar | ||||
for i in range(5): | ||||
ars.append(self.view.apply_async(lambda : 1)) | ||||
self.view.wait(ars) | ||||
for ar in ars: | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.engine_id, first_id) | ||
MinRK
|
r3873 | |||
def test_after(self): | ||||
view = self.view | ||||
ar = view.apply_async(time.sleep, 0.5) | ||||
with view.temp_flags(after=ar): | ||||
ar2 = view.apply_async(lambda : 1) | ||||
ar.wait() | ||||
ar2.wait() | ||||
MinRK
|
r5077 | self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed)) | ||