##// END OF EJS Templates
Backport PR #8129: Catch interrupted poll() in terminal console...
Backport PR #8129: Catch interrupted poll() in terminal console Alternative to my own PR #8108 - catch ZMQError in run_cell, and if it's caused by an interrupt, ignore it. #8108 catches the exception in the blocking kernel client API, which is more complex, especially if we want to handle the timeout nicely as proposed in the comments, but it's possibly also more convenient for other users of that API. Or perhaps not - I'm not sure what makes sense for other API consumers in this case. Fixes gh-8105

File last commit:

r13383:c4ee496b
r20883:0d9a347c
Show More
test_lbview.py
221 lines | 6.9 KiB | text/x-python | PythonLexer
MinRK
move coding declaration above docstring in test_view...
r4034 # -*- coding: utf-8 -*-
MinRK
update recently changed modules with Authors in docstring
r4018 """test LoadBalancedView objects
Authors:
* Min RK
"""
MinRK
add retries flag to LoadBalancedView...
r3873 #-------------------------------------------------------------------------------
# Copyright (C) 2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-------------------------------------------------------------------------------
#-------------------------------------------------------------------------------
# Imports
#-------------------------------------------------------------------------------
import sys
import time
import zmq
MinRK
further tweaks to parallel tests...
r4580 from nose import SkipTest
MinRK
tag crash tests with @attr('crash')...
r8200 from nose.plugins.attrib import attr
MinRK
add retries flag to LoadBalancedView...
r3873
from IPython import parallel as pmod
from IPython.parallel import error
from IPython.parallel.tests import add_engines
from .clienttest import ClusterTestCase, crash, wait, skip_without
def setup():
MinRK
expedite IPython.parallel tests...
r6162 add_engines(3, total=True)
MinRK
add retries flag to LoadBalancedView...
r3873
class TestLoadBalancedView(ClusterTestCase):
def setUp(self):
ClusterTestCase.setUp(self)
self.view = self.client.load_balanced_view()
MinRK
tag crash tests with @attr('crash')...
r8200 @attr('crash')
MinRK
add retries flag to LoadBalancedView...
r3873 def test_z_crash_task(self):
"""test graceful handling of engine death (balanced)"""
# self.add_engines(1)
ar = self.view.apply_async(crash)
MinRK
add Client.resubmit for re-running tasks...
r3874 self.assertRaisesRemote(error.EngineError, ar.get, 10)
MinRK
add retries flag to LoadBalancedView...
r3873 eid = ar.engine_id
tic = time.time()
while eid in self.client.ids and time.time()-tic < 5:
time.sleep(.01)
self.client.spin()
self.assertFalse(eid in self.client.ids, "Engine should have died")
def test_map(self):
def f(x):
return x**2
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(16))
MinRK
add retries flag to LoadBalancedView...
r3873 r = self.view.map_sync(f, data)
Thomas Kluyver
Fix parallel test suite
r13383 self.assertEqual(r, list(map(f, data)))
MinRK
test view.map on a generator
r10571
def test_map_generator(self):
def f(x):
return x**2
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(16))
MinRK
test view.map on a generator
r10571 r = self.view.map_sync(f, iter(data))
Thomas Kluyver
Fix parallel test suite
r13383 self.assertEqual(r, list(map(f, iter(data))))
MinRK
test view.map on a generator
r10571
MinRK
test view.map on mismatches sequences
r10569 def test_map_short_first(self):
def f(x,y):
if y is None:
return y
if x is None:
return x
return x*y
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(10))
data2 = list(range(4))
MinRK
test view.map on mismatches sequences
r10569
r = self.view.map_sync(f, data, data2)
Thomas Kluyver
Fix parallel test suite
r13383 self.assertEqual(r, list(map(f, data, data2)))
MinRK
test view.map on mismatches sequences
r10569
def test_map_short_last(self):
def f(x,y):
if y is None:
return y
if x is None:
return x
return x*y
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(4))
data2 = list(range(10))
MinRK
test view.map on mismatches sequences
r10569
r = self.view.map_sync(f, data, data2)
Thomas Kluyver
Fix parallel test suite
r13383 self.assertEqual(r, list(map(f, data, data2)))
MinRK
test view.map on mismatches sequences
r10569
MinRK
add unordered iteration to AsyncMapResults...
r5171 def test_map_unordered(self):
def f(x):
return x**2
def slow_f(x):
import time
time.sleep(0.05*x)
return x**2
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(16,0,-1))
reference = list(map(f, data))
MinRK
add unordered iteration to AsyncMapResults...
r5171
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 amr = self.view.map_async(slow_f, data, ordered=False)
MinRK
add unordered iteration to AsyncMapResults...
r5171 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 # check individual elements, retrieved as they come
# list comprehension uses __iter__
astheycame = [ r for r in amr ]
MinRK
add unordered iteration to AsyncMapResults...
r5171 # Ensure that at least one result came out of order:
Bradley M. Froehle
s/assertNotEquals/assertNotEqual/
r7878 self.assertNotEqual(astheycame, reference, "should not have preserved order")
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
MinRK
add unordered iteration to AsyncMapResults...
r5171
def test_map_ordered(self):
def f(x):
return x**2
def slow_f(x):
import time
time.sleep(0.05*x)
return x**2
Thomas Kluyver
Fix parallel test suite
r13383 data = list(range(16,0,-1))
reference = list(map(f, data))
MinRK
add unordered iteration to AsyncMapResults...
r5171
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 amr = self.view.map_async(slow_f, data)
MinRK
add unordered iteration to AsyncMapResults...
r5171 self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 # check individual elements, retrieved as they come
# list(amr) uses __iter__
MinRK
add unordered iteration to AsyncMapResults...
r5171 astheycame = list(amr)
# Ensure that results came in order
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(astheycame, reference)
self.assertEqual(amr.result, reference)
MinRK
support iterators in view.map...
r5560
def test_map_iterable(self):
"""test map on iterables (balanced)"""
view = self.view
# 101 is prime, so it won't be evenly distributed
arr = range(101)
# so that it will be an iterator, even in Python 3
it = iter(arr)
r = view.map_sync(lambda x:x, arr)
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(r, list(arr))
MinRK
support iterators in view.map...
r5560
MinRK
add unordered iteration to AsyncMapResults...
r5171
MinRK
add retries flag to LoadBalancedView...
r3873 def test_abort(self):
view = self.view
ar = self.client[:].apply_async(time.sleep, .5)
MinRK
improve reliability of occasionally failing parallel tests
r4469 ar = self.client[:].apply_async(time.sleep, .5)
MinRK
further tweaks to parallel tests...
r4580 time.sleep(0.2)
MinRK
add retries flag to LoadBalancedView...
r3873 ar2 = view.apply_async(lambda : 2)
ar3 = view.apply_async(lambda : 3)
view.abort(ar2)
view.abort(ar3.msg_ids)
self.assertRaises(error.TaskAborted, ar2.get)
self.assertRaises(error.TaskAborted, ar3.get)
def test_retries(self):
MinRK
set minimum engines in a few tests
r12788 self.minimum_engines(3)
MinRK
add retries flag to LoadBalancedView...
r3873 view = self.view
def fail():
assert False
for r in range(len(self.client)-1):
with view.temp_flags(retries=r):
self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
MinRK
test very short timeouts...
r12786 with view.temp_flags(retries=len(self.client), timeout=0.1):
MinRK
add retries flag to LoadBalancedView...
r3873 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
MinRK
test very short timeouts...
r12786 def test_short_timeout(self):
MinRK
set minimum engines in a few tests
r12788 self.minimum_engines(2)
MinRK
test very short timeouts...
r12786 view = self.view
def fail():
import time
time.sleep(0.25)
assert False
with view.temp_flags(retries=1, timeout=0.01):
self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
MinRK
add retries flag to LoadBalancedView...
r3873 def test_invalid_dependency(self):
view = self.view
with view.temp_flags(after='12345'):
self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
def test_impossible_dependency(self):
MinRK
expedite IPython.parallel tests...
r6162 self.minimum_engines(2)
MinRK
add retries flag to LoadBalancedView...
r3873 view = self.client.load_balanced_view()
ar1 = view.apply_async(lambda : 1)
ar1.get()
e1 = ar1.engine_id
e2 = e1
while e2 == e1:
ar2 = view.apply_async(lambda : 1)
ar2.get()
e2 = ar2.engine_id
with view.temp_flags(follow=[ar1, ar2]):
self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
def test_follow(self):
ar = self.view.apply_async(lambda : 1)
ar.get()
ars = []
first_id = ar.engine_id
self.view.follow = ar
for i in range(5):
ars.append(self.view.apply_async(lambda : 1))
self.view.wait(ars)
for ar in ars:
Bradley M. Froehle
s/assertEquals/assertEqual/
r7874 self.assertEqual(ar.engine_id, first_id)
MinRK
add retries flag to LoadBalancedView...
r3873
def test_after(self):
view = self.view
ar = view.apply_async(time.sleep, 0.5)
with view.temp_flags(after=ar):
ar2 = view.apply_async(lambda : 1)
ar.wait()
ar2.wait()
MinRK
relax timestamp comparison from \ge to \geq in test...
r5077 self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))