test_lbview.py
176 lines
| 5.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4034 | # -*- coding: utf-8 -*- | ||
MinRK
|
r4018 | """test LoadBalancedView objects | ||
Authors: | ||||
* Min RK | ||||
""" | ||||
MinRK
|
r3873 | #------------------------------------------------------------------------------- | ||
# Copyright (C) 2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#------------------------------------------------------------------------------- | ||||
#------------------------------------------------------------------------------- | ||||
# Imports | ||||
#------------------------------------------------------------------------------- | ||||
import sys | ||||
import time | ||||
import zmq | ||||
MinRK
|
r4580 | from nose import SkipTest | ||
MinRK
|
r3873 | |||
from IPython import parallel as pmod | ||||
from IPython.parallel import error | ||||
from IPython.parallel.tests import add_engines | ||||
from .clienttest import ClusterTestCase, crash, wait, skip_without | ||||
def setup(): | ||||
MinRK
|
r6162 | add_engines(3, total=True) | ||
MinRK
|
r3873 | |||
class TestLoadBalancedView(ClusterTestCase): | ||||
def setUp(self): | ||||
ClusterTestCase.setUp(self) | ||||
self.view = self.client.load_balanced_view() | ||||
def test_z_crash_task(self): | ||||
"""test graceful handling of engine death (balanced)""" | ||||
MinRK
|
r4580 | raise SkipTest("crash tests disabled, due to undesirable crash reports") | ||
MinRK
|
r3873 | # self.add_engines(1) | ||
ar = self.view.apply_async(crash) | ||||
MinRK
|
r3874 | self.assertRaisesRemote(error.EngineError, ar.get, 10) | ||
MinRK
|
r3873 | eid = ar.engine_id | ||
tic = time.time() | ||||
while eid in self.client.ids and time.time()-tic < 5: | ||||
time.sleep(.01) | ||||
self.client.spin() | ||||
self.assertFalse(eid in self.client.ids, "Engine should have died") | ||||
def test_map(self): | ||||
def f(x): | ||||
return x**2 | ||||
data = range(16) | ||||
r = self.view.map_sync(f, data) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, map(f, data)) | ||
MinRK
|
r3873 | |||
MinRK
|
r5171 | def test_map_unordered(self): | ||
def f(x): | ||||
return x**2 | ||||
def slow_f(x): | ||||
import time | ||||
time.sleep(0.05*x) | ||||
return x**2 | ||||
data = range(16,0,-1) | ||||
reference = map(f, data) | ||||
MinRK
|
r5222 | amr = self.view.map_async(slow_f, data, ordered=False) | ||
MinRK
|
r5171 | self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) | ||
MinRK
|
r5222 | # check individual elements, retrieved as they come | ||
# list comprehension uses __iter__ | ||||
astheycame = [ r for r in amr ] | ||||
MinRK
|
r5171 | # Ensure that at least one result came out of order: | ||
self.assertNotEquals(astheycame, reference, "should not have preserved order") | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted") | ||
MinRK
|
r5171 | |||
def test_map_ordered(self): | ||||
def f(x): | ||||
return x**2 | ||||
def slow_f(x): | ||||
import time | ||||
time.sleep(0.05*x) | ||||
return x**2 | ||||
data = range(16,0,-1) | ||||
reference = map(f, data) | ||||
MinRK
|
r5222 | amr = self.view.map_async(slow_f, data) | ||
MinRK
|
r5171 | self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) | ||
MinRK
|
r5222 | # check individual elements, retrieved as they come | ||
# list(amr) uses __iter__ | ||||
MinRK
|
r5171 | astheycame = list(amr) | ||
# Ensure that results came in order | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(astheycame, reference) | ||
self.assertEqual(amr.result, reference) | ||||
MinRK
|
r5560 | |||
def test_map_iterable(self): | ||||
"""test map on iterables (balanced)""" | ||||
view = self.view | ||||
# 101 is prime, so it won't be evenly distributed | ||||
arr = range(101) | ||||
# so that it will be an iterator, even in Python 3 | ||||
it = iter(arr) | ||||
r = view.map_sync(lambda x:x, arr) | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(r, list(arr)) | ||
MinRK
|
r5560 | |||
MinRK
|
r5171 | |||
MinRK
|
r3873 | def test_abort(self): | ||
view = self.view | ||||
ar = self.client[:].apply_async(time.sleep, .5) | ||||
MinRK
|
r4469 | ar = self.client[:].apply_async(time.sleep, .5) | ||
MinRK
|
r4580 | time.sleep(0.2) | ||
MinRK
|
r3873 | ar2 = view.apply_async(lambda : 2) | ||
ar3 = view.apply_async(lambda : 3) | ||||
view.abort(ar2) | ||||
view.abort(ar3.msg_ids) | ||||
self.assertRaises(error.TaskAborted, ar2.get) | ||||
self.assertRaises(error.TaskAborted, ar3.get) | ||||
def test_retries(self): | ||||
view = self.view | ||||
view.timeout = 1 # prevent hang if this doesn't behave | ||||
def fail(): | ||||
assert False | ||||
for r in range(len(self.client)-1): | ||||
with view.temp_flags(retries=r): | ||||
self.assertRaisesRemote(AssertionError, view.apply_sync, fail) | ||||
with view.temp_flags(retries=len(self.client), timeout=0.25): | ||||
self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail) | ||||
def test_invalid_dependency(self): | ||||
view = self.view | ||||
with view.temp_flags(after='12345'): | ||||
self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1) | ||||
def test_impossible_dependency(self): | ||||
MinRK
|
r6162 | self.minimum_engines(2) | ||
MinRK
|
r3873 | view = self.client.load_balanced_view() | ||
ar1 = view.apply_async(lambda : 1) | ||||
ar1.get() | ||||
e1 = ar1.engine_id | ||||
e2 = e1 | ||||
while e2 == e1: | ||||
ar2 = view.apply_async(lambda : 1) | ||||
ar2.get() | ||||
e2 = ar2.engine_id | ||||
with view.temp_flags(follow=[ar1, ar2]): | ||||
self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1) | ||||
def test_follow(self): | ||||
ar = self.view.apply_async(lambda : 1) | ||||
ar.get() | ||||
ars = [] | ||||
first_id = ar.engine_id | ||||
self.view.follow = ar | ||||
for i in range(5): | ||||
ars.append(self.view.apply_async(lambda : 1)) | ||||
self.view.wait(ars) | ||||
for ar in ars: | ||||
Bradley M. Froehle
|
r7874 | self.assertEqual(ar.engine_id, first_id) | ||
MinRK
|
r3873 | |||
def test_after(self): | ||||
view = self.view | ||||
ar = view.apply_async(time.sleep, 0.5) | ||||
with view.temp_flags(after=ar): | ||||
ar2 = view.apply_async(lambda : 1) | ||||
ar.wait() | ||||
ar2.wait() | ||||
MinRK
|
r5077 | self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed)) | ||