# -*- coding: utf-8 -*- """test LoadBalancedView objects Authors: * Min RK """ #------------------------------------------------------------------------------- # Copyright (C) 2011 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #------------------------------------------------------------------------------- #------------------------------------------------------------------------------- # Imports #------------------------------------------------------------------------------- import sys import time import zmq from nose import SkipTest from nose.plugins.attrib import attr from IPython import parallel as pmod from IPython.parallel import error from IPython.parallel.tests import add_engines from .clienttest import ClusterTestCase, crash, wait, skip_without def setup(): add_engines(3, total=True) class TestLoadBalancedView(ClusterTestCase): def setUp(self): ClusterTestCase.setUp(self) self.view = self.client.load_balanced_view() @attr('crash') def test_z_crash_task(self): """test graceful handling of engine death (balanced)""" # self.add_engines(1) ar = self.view.apply_async(crash) self.assertRaisesRemote(error.EngineError, ar.get, 10) eid = ar.engine_id tic = time.time() while eid in self.client.ids and time.time()-tic < 5: time.sleep(.01) self.client.spin() self.assertFalse(eid in self.client.ids, "Engine should have died") def test_map(self): def f(x): return x**2 data = list(range(16)) r = self.view.map_sync(f, data) self.assertEqual(r, list(map(f, data))) def test_map_generator(self): def f(x): return x**2 data = list(range(16)) r = self.view.map_sync(f, iter(data)) self.assertEqual(r, list(map(f, iter(data)))) def test_map_short_first(self): def f(x,y): if y is None: return y if x is None: return x return x*y data = list(range(10)) data2 = list(range(4)) r = self.view.map_sync(f, data, data2) self.assertEqual(r, list(map(f, data, data2))) def test_map_short_last(self): def f(x,y): if y is None: return y if x is None: return x return x*y data = list(range(4)) data2 = list(range(10)) r = self.view.map_sync(f, data, data2) self.assertEqual(r, list(map(f, data, data2))) def test_map_unordered(self): def f(x): return x**2 def slow_f(x): import time time.sleep(0.05*x) return x**2 data = list(range(16,0,-1)) reference = list(map(f, data)) amr = self.view.map_async(slow_f, data, ordered=False) self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) # check individual elements, retrieved as they come # list comprehension uses __iter__ astheycame = [ r for r in amr ] # Ensure that at least one result came out of order: self.assertNotEqual(astheycame, reference, "should not have preserved order") self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted") def test_map_ordered(self): def f(x): return x**2 def slow_f(x): import time time.sleep(0.05*x) return x**2 data = list(range(16,0,-1)) reference = list(map(f, data)) amr = self.view.map_async(slow_f, data) self.assertTrue(isinstance(amr, pmod.AsyncMapResult)) # check individual elements, retrieved as they come # list(amr) uses __iter__ astheycame = list(amr) # Ensure that results came in order self.assertEqual(astheycame, reference) self.assertEqual(amr.result, reference) def test_map_iterable(self): """test map on iterables (balanced)""" view = self.view # 101 is prime, so it won't be evenly distributed arr = range(101) # so that it will be an iterator, even in Python 3 it = iter(arr) r = view.map_sync(lambda x:x, arr) self.assertEqual(r, list(arr)) def test_abort(self): view = self.view ar = self.client[:].apply_async(time.sleep, .5) ar = self.client[:].apply_async(time.sleep, .5) time.sleep(0.2) ar2 = view.apply_async(lambda : 2) ar3 = view.apply_async(lambda : 3) view.abort(ar2) view.abort(ar3.msg_ids) self.assertRaises(error.TaskAborted, ar2.get) self.assertRaises(error.TaskAborted, ar3.get) def test_retries(self): self.minimum_engines(3) view = self.view def fail(): assert False for r in range(len(self.client)-1): with view.temp_flags(retries=r): self.assertRaisesRemote(AssertionError, view.apply_sync, fail) with view.temp_flags(retries=len(self.client), timeout=0.1): self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail) def test_short_timeout(self): self.minimum_engines(2) view = self.view def fail(): import time time.sleep(0.25) assert False with view.temp_flags(retries=1, timeout=0.01): self.assertRaisesRemote(AssertionError, view.apply_sync, fail) def test_invalid_dependency(self): view = self.view with view.temp_flags(after='12345'): self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1) def test_impossible_dependency(self): self.minimum_engines(2) view = self.client.load_balanced_view() ar1 = view.apply_async(lambda : 1) ar1.get() e1 = ar1.engine_id e2 = e1 while e2 == e1: ar2 = view.apply_async(lambda : 1) ar2.get() e2 = ar2.engine_id with view.temp_flags(follow=[ar1, ar2]): self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1) def test_follow(self): ar = self.view.apply_async(lambda : 1) ar.get() ars = [] first_id = ar.engine_id self.view.follow = ar for i in range(5): ars.append(self.view.apply_async(lambda : 1)) self.view.wait(ars) for ar in ars: self.assertEqual(ar.engine_id, first_id) def test_after(self): view = self.view ar = view.apply_async(time.sleep, 0.5) with view.temp_flags(after=ar): ar2 = view.apply_async(lambda : 1) ar.wait() ar2.wait() self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))