##// END OF EJS Templates
Make copy of notebook node object to make sure the source doesn't get modified
Make copy of notebook node object to make sure the source doesn't get modified

File last commit:

r10571:16ddad4f
r11375:410cedc0
Show More
test_lbview.py
211 lines | 6.5 KiB | text/x-python | PythonLexer
MinRK
move coding declaration above docstring in test_view...
r4034 # -*- coding: utf-8 -*-
MinRK
update recently changed modules with Authors in docstring
r4018 """test LoadBalancedView objects
Authors:
* Min RK
"""
MinRK
add retries flag to LoadBalancedView...
r3873 #-------------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import sys
import time
import zmq
MinRK
further tweaks to parallel tests...
r4580 from nose import SkipTest
MinRK
tag crash tests with @attr('crash')...
r8200 from nose.plugins.attrib import attr
MinRK
add retries flag to LoadBalancedView...
r3873
from IPython import parallel as pmod
from IPython.parallel import error
from IPython.parallel.tests import add_engines
from .clienttest import ClusterTestCase, crash, wait, skip_without
def setup():
MinRK
expedite IPython.parallel tests...
r6162 add_engines(3, total=True)
MinRK
add retries flag to LoadBalancedView...
r3873
class TestLoadBalancedView(ClusterTestCase):
def setUp(self):
ClusterTestCase.setUp(self)
self.view = self.client.load_balanced_view()
MinRK
tag crash tests with @attr('crash')...
r8200 @attr('crash')
MinRK
add retries flag to LoadBalancedView...
r3873 def test_z_crash_task(self):
"""test graceful handling of engine death (balanced)"""
# self.add_engines(1)
ar = self.view.apply_async(crash)
MinRK
add Client.resubmit for re-running tasks...
r3874 self.assertRaisesRemote(error.EngineError, ar.get, 10)
MinRK
add retries flag to LoadBalancedView...
r3873 eid = ar.engine_id
tic = time.time()
while eid in self.client.ids and time.time()-tic < 5:
time.sleep(.01)
self.client.spin()
self.assertFalse(eid in self.client.ids, "Engine should have died")
def test_map(self):
def f(x):
return x**2
data = range(16)
r = self.view.map_sync(f, data)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, map(f, data))
MinRK
test view.map on a generator
r10571
def test_map_generator(self):
def f(x):
return x**2
data = range(16)
r = self.view.map_sync(f, iter(data))
self.assertEqual(r, map(f, iter(data)))
MinRK
test view.map on mismatches sequences
r10569 def test_map_short_first(self):
def f(x,y):
if y is None:
return y
if x is None:
return x
return x*y
data = range(10)
data2 = range(4)
r = self.view.map_sync(f, data, data2)
self.assertEqual(r, map(f, data, data2))
def test_map_short_last(self):
def f(x,y):
if y is None:
return y
if x is None:
return x
return x*y
data = range(4)
data2 = range(10)
r = self.view.map_sync(f, data, data2)
self.assertEqual(r, map(f, data, data2))
MinRK
add unordered iteration to AsyncMapResults...
r5171 def test_map_unordered(self):
def f(x):
return x**2
def slow_f(x):
import time
time.sleep(0.05*x)
return x**2
data = range(16,0,-1)
reference = map(f, data)
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 amr = self.view.map_async(slow_f, data, ordered=False)
MinRK
add unordered iteration to AsyncMapResults...
r5171 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 # check individual elements, retrieved as they come
# list comprehension uses __iter__
astheycame = [ r for r in amr ]
MinRK
add unordered iteration to AsyncMapResults...
r5171 # Ensure that at least one result came out of order:
Bradley M. Froehle
s/assertNotEquals/assertNotEqual/
r7878 self.assertNotEqual(astheycame, reference, "should not have preserved order")
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
MinRK
add unordered iteration to AsyncMapResults...
r5171
def test_map_ordered(self):
def f(x):
return x**2
def slow_f(x):
import time
time.sleep(0.05*x)
return x**2
data = range(16,0,-1)
reference = map(f, data)
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 amr = self.view.map_async(slow_f, data)
MinRK
add unordered iteration to AsyncMapResults...
r5171 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 # check individual elements, retrieved as they come
# list(amr) uses __iter__
MinRK
add unordered iteration to AsyncMapResults...
r5171 astheycame = list(amr)
# Ensure that results came in order
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(astheycame, reference)
self.assertEqual(amr.result, reference)
MinRK
support iterators in view.map...
r5560
def test_map_iterable(self):
"""test map on iterables (balanced)"""
view = self.view
# 101 is prime, so it won't be evenly distributed
arr = range(101)
# so that it will be an iterator, even in Python 3
it = iter(arr)
r = view.map_sync(lambda x:x, arr)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, list(arr))
MinRK
support iterators in view.map...
r5560
MinRK
add unordered iteration to AsyncMapResults...
r5171
MinRK
add retries flag to LoadBalancedView...
r3873 def test_abort(self):
view = self.view
ar = self.client[:].apply_async(time.sleep, .5)
MinRK
improve reliability of occasionally failing parallel tests
r4469 ar = self.client[:].apply_async(time.sleep, .5)
MinRK
further tweaks to parallel tests...
r4580 time.sleep(0.2)
MinRK
add retries flag to LoadBalancedView...
r3873 ar2 = view.apply_async(lambda : 2)
ar3 = view.apply_async(lambda : 3)
view.abort(ar2)
view.abort(ar3.msg_ids)
self.assertRaises(error.TaskAborted, ar2.get)
self.assertRaises(error.TaskAborted, ar3.get)
def test_retries(self):
view = self.view
view.timeout = 1 # prevent hang if this doesn't behave
def fail():
assert False
for r in range(len(self.client)-1):
with view.temp_flags(retries=r):
self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
with view.temp_flags(retries=len(self.client), timeout=0.25):
self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
def test_invalid_dependency(self):
view = self.view
with view.temp_flags(after='12345'):
self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
def test_impossible_dependency(self):
MinRK
expedite IPython.parallel tests...
r6162 self.minimum_engines(2)
MinRK
add retries flag to LoadBalancedView...
r3873 view = self.client.load_balanced_view()
ar1 = view.apply_async(lambda : 1)
ar1.get()
e1 = ar1.engine_id
e2 = e1
while e2 == e1:
ar2 = view.apply_async(lambda : 1)
ar2.get()
e2 = ar2.engine_id
with view.temp_flags(follow=[ar1, ar2]):
self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
def test_follow(self):
ar = self.view.apply_async(lambda : 1)
ar.get()
ars = []
first_id = ar.engine_id
self.view.follow = ar
for i in range(5):
ars.append(self.view.apply_async(lambda : 1))
self.view.wait(ars)
for ar in ars:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ar.engine_id, first_id)
MinRK
add retries flag to LoadBalancedView...
r3873
def test_after(self):
view = self.view
ar = view.apply_async(time.sleep, 0.5)
with view.temp_flags(after=ar):
ar2 = view.apply_async(lambda : 1)
ar.wait()
ar2.wait()
MinRK
relax timestamp comparison from \ge to \geq in test...
r5077 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))