##// END OF EJS Templates
Merge pull request #6269 from takluyver/atomic-save...
Merge pull request #6269 from takluyver/atomic-save Implement atomic save

File last commit:

r13383:c4ee496b
r17754:15ecb841 merge
Show More
test_lbview.py
221 lines | 6.9 KiB | text/x-python | PythonLexer
MinRK
move coding declaration above docstring in test_view...
r4034 # -*- coding: utf-8 -*-
MinRK
update recently changed modules with Authors in docstring
r4018 """test LoadBalancedView objects
Authors:
* Min RK
"""
MinRK
add retries flag to LoadBalancedView...
r3873 #-------------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import sys
import time
import zmq
MinRK
further tweaks to parallel tests...
r4580 from nose import SkipTest
MinRK
tag crash tests with @attr('crash')...
r8200 from nose.plugins.attrib import attr
MinRK
add retries flag to LoadBalancedView...
r3873
from IPython import parallel as pmod
from IPython.parallel import error
from IPython.parallel.tests import add_engines
from .clienttest import ClusterTestCase, crash, wait, skip_without
def setup():
MinRK
expedite IPython.parallel tests...
r6162 add_engines(3, total=True)
MinRK
add retries flag to LoadBalancedView...
r3873
class TestLoadBalancedView(ClusterTestCase):
def setUp(self):
ClusterTestCase.setUp(self)
self.view = self.client.load_balanced_view()
MinRK
tag crash tests with @attr('crash')...
r8200 @attr('crash')
MinRK
add retries flag to LoadBalancedView...
r3873 def test_z_crash_task(self):
"""test graceful handling of engine death (balanced)"""
# self.add_engines(1)
ar = self.view.apply_async(crash)
MinRK
add Client.resubmit for re-running tasks...
r3874 self.assertRaisesRemote(error.EngineError, ar.get, 10)
MinRK
add retries flag to LoadBalancedView...
r3873 eid = ar.engine_id
tic = time.time()
while eid in self.client.ids and time.time()-tic < 5:
time.sleep(.01)
self.client.spin()
self.assertFalse(eid in self.client.ids, "Engine should have died")
def test_map(self):
def f(x):
return x**2
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(16))
MinRK
add retries flag to LoadBalancedView...
r3873 r = self.view.map_sync(f, data)
Thomas Kluyver
Fix parallel test suite
r13383 self.assertEqual(r, list(map(f, data)))
MinRK
test view.map on a generator
r10571
def test_map_generator(self):
def f(x):
return x**2
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(16))
MinRK
test view.map on a generator
r10571 r = self.view.map_sync(f, iter(data))
Thomas Kluyver
Fix parallel test suite
r13383 self.assertEqual(r, list(map(f, iter(data))))
MinRK
test view.map on a generator
r10571
MinRK
test view.map on mismatches sequences
r10569 def test_map_short_first(self):
def f(x,y):
if y is None:
return y
if x is None:
return x
return x*y
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(10))
data2 = list(range(4))
MinRK
test view.map on mismatches sequences
r10569
r = self.view.map_sync(f, data, data2)
Thomas Kluyver
Fix parallel test suite
r13383 self.assertEqual(r, list(map(f, data, data2)))
MinRK
test view.map on mismatches sequences
r10569
def test_map_short_last(self):
def f(x,y):
if y is None:
return y
if x is None:
return x
return x*y
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(4))
data2 = list(range(10))
MinRK
test view.map on mismatches sequences
r10569
r = self.view.map_sync(f, data, data2)
Thomas Kluyver
Fix parallel test suite
r13383 self.assertEqual(r, list(map(f, data, data2)))
MinRK
test view.map on mismatches sequences
r10569
MinRK
add unordered iteration to AsyncMapResults...
r5171 def test_map_unordered(self):
def f(x):
return x**2
def slow_f(x):
import time
time.sleep(0.05*x)
return x**2
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(16,0,-1))
reference = list(map(f, data))
MinRK
add unordered iteration to AsyncMapResults...
r5171
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 amr = self.view.map_async(slow_f, data, ordered=False)
MinRK
add unordered iteration to AsyncMapResults...
r5171 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 # check individual elements, retrieved as they come
# list comprehension uses __iter__
astheycame = [ r for r in amr ]
MinRK
add unordered iteration to AsyncMapResults...
r5171 # Ensure that at least one result came out of order:
Bradley M. Froehle
s/assertNotEquals/assertNotEqual/
r7878 self.assertNotEqual(astheycame, reference, "should not have preserved order")
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
MinRK
add unordered iteration to AsyncMapResults...
r5171
def test_map_ordered(self):
def f(x):
return x**2
def slow_f(x):
import time
time.sleep(0.05*x)
return x**2
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(16,0,-1))
reference = list(map(f, data))
MinRK
add unordered iteration to AsyncMapResults...
r5171
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 amr = self.view.map_async(slow_f, data)
MinRK
add unordered iteration to AsyncMapResults...
r5171 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 # check individual elements, retrieved as they come
# list(amr) uses __iter__
MinRK
add unordered iteration to AsyncMapResults...
r5171 astheycame = list(amr)
# Ensure that results came in order
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(astheycame, reference)
self.assertEqual(amr.result, reference)
MinRK
support iterators in view.map...
r5560
def test_map_iterable(self):
"""test map on iterables (balanced)"""
view = self.view
# 101 is prime, so it won't be evenly distributed
arr = range(101)
# so that it will be an iterator, even in Python 3
it = iter(arr)
r = view.map_sync(lambda x:x, arr)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, list(arr))
MinRK
support iterators in view.map...
r5560
MinRK
add unordered iteration to AsyncMapResults...
r5171
MinRK
add retries flag to LoadBalancedView...
r3873 def test_abort(self):
view = self.view
ar = self.client[:].apply_async(time.sleep, .5)
MinRK
improve reliability of occasionally failing parallel tests
r4469 ar = self.client[:].apply_async(time.sleep, .5)
MinRK
further tweaks to parallel tests...
r4580 time.sleep(0.2)
MinRK
add retries flag to LoadBalancedView...
r3873 ar2 = view.apply_async(lambda : 2)
ar3 = view.apply_async(lambda : 3)
view.abort(ar2)
view.abort(ar3.msg_ids)
self.assertRaises(error.TaskAborted, ar2.get)
self.assertRaises(error.TaskAborted, ar3.get)
def test_retries(self):
MinRK
set minimum engines in a few tests
r12788 self.minimum_engines(3)
MinRK
add retries flag to LoadBalancedView...
r3873 view = self.view
def fail():
assert False
for r in range(len(self.client)-1):
with view.temp_flags(retries=r):
self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
MinRK
test very short timeouts...
r12786 with view.temp_flags(retries=len(self.client), timeout=0.1):
MinRK
add retries flag to LoadBalancedView...
r3873 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
MinRK
test very short timeouts...
r12786 def test_short_timeout(self):
MinRK
set minimum engines in a few tests
r12788 self.minimum_engines(2)
MinRK
test very short timeouts...
r12786 view = self.view
def fail():
import time
time.sleep(0.25)
assert False
with view.temp_flags(retries=1, timeout=0.01):
self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
MinRK
add retries flag to LoadBalancedView...
r3873 def test_invalid_dependency(self):
view = self.view
with view.temp_flags(after='12345'):
self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
def test_impossible_dependency(self):
MinRK
expedite IPython.parallel tests...
r6162 self.minimum_engines(2)
MinRK
add retries flag to LoadBalancedView...
r3873 view = self.client.load_balanced_view()
ar1 = view.apply_async(lambda : 1)
ar1.get()
e1 = ar1.engine_id
e2 = e1
while e2 == e1:
ar2 = view.apply_async(lambda : 1)
ar2.get()
e2 = ar2.engine_id
with view.temp_flags(follow=[ar1, ar2]):
self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
def test_follow(self):
ar = self.view.apply_async(lambda : 1)
ar.get()
ars = []
first_id = ar.engine_id
self.view.follow = ar
for i in range(5):
ars.append(self.view.apply_async(lambda : 1))
self.view.wait(ars)
for ar in ars:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ar.engine_id, first_id)
MinRK
add retries flag to LoadBalancedView...
r3873
def test_after(self):
view = self.view
ar = view.apply_async(time.sleep, 0.5)
with view.temp_flags(after=ar):
ar2 = view.apply_async(lambda : 1)
ar.wait()
ar2.wait()
MinRK
relax timestamp comparison from \ge to \geq in test...
r5077 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))