|
|
# -*- coding: utf-8 -*-
|
|
|
"""test LoadBalancedView objects
|
|
|
|
|
|
Authors:
|
|
|
|
|
|
* Min RK
|
|
|
"""
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Copyright (C) 2011 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
import sys
|
|
|
import time
|
|
|
|
|
|
import zmq
|
|
|
from nose import SkipTest
|
|
|
from nose.plugins.attrib import attr
|
|
|
|
|
|
from IPython import parallel as pmod
|
|
|
from ipython_parallel import error
|
|
|
|
|
|
from ipython_parallel.tests import add_engines
|
|
|
|
|
|
from .clienttest import ClusterTestCase, crash, wait, skip_without
|
|
|
|
|
|
def setup():
|
|
|
add_engines(3, total=True)
|
|
|
|
|
|
class TestLoadBalancedView(ClusterTestCase):
|
|
|
|
|
|
def setUp(self):
|
|
|
ClusterTestCase.setUp(self)
|
|
|
self.view = self.client.load_balanced_view()
|
|
|
|
|
|
@attr('crash')
|
|
|
def test_z_crash_task(self):
|
|
|
"""test graceful handling of engine death (balanced)"""
|
|
|
# self.add_engines(1)
|
|
|
ar = self.view.apply_async(crash)
|
|
|
self.assertRaisesRemote(error.EngineError, ar.get, 10)
|
|
|
eid = ar.engine_id
|
|
|
tic = time.time()
|
|
|
while eid in self.client.ids and time.time()-tic < 5:
|
|
|
time.sleep(.01)
|
|
|
self.client.spin()
|
|
|
self.assertFalse(eid in self.client.ids, "Engine should have died")
|
|
|
|
|
|
def test_map(self):
|
|
|
def f(x):
|
|
|
return x**2
|
|
|
data = list(range(16))
|
|
|
r = self.view.map_sync(f, data)
|
|
|
self.assertEqual(r, list(map(f, data)))
|
|
|
|
|
|
def test_map_generator(self):
|
|
|
def f(x):
|
|
|
return x**2
|
|
|
|
|
|
data = list(range(16))
|
|
|
r = self.view.map_sync(f, iter(data))
|
|
|
self.assertEqual(r, list(map(f, iter(data))))
|
|
|
|
|
|
def test_map_short_first(self):
|
|
|
def f(x,y):
|
|
|
if y is None:
|
|
|
return y
|
|
|
if x is None:
|
|
|
return x
|
|
|
return x*y
|
|
|
data = list(range(10))
|
|
|
data2 = list(range(4))
|
|
|
|
|
|
r = self.view.map_sync(f, data, data2)
|
|
|
self.assertEqual(r, list(map(f, data, data2)))
|
|
|
|
|
|
def test_map_short_last(self):
|
|
|
def f(x,y):
|
|
|
if y is None:
|
|
|
return y
|
|
|
if x is None:
|
|
|
return x
|
|
|
return x*y
|
|
|
data = list(range(4))
|
|
|
data2 = list(range(10))
|
|
|
|
|
|
r = self.view.map_sync(f, data, data2)
|
|
|
self.assertEqual(r, list(map(f, data, data2)))
|
|
|
|
|
|
def test_map_unordered(self):
|
|
|
def f(x):
|
|
|
return x**2
|
|
|
def slow_f(x):
|
|
|
import time
|
|
|
time.sleep(0.05*x)
|
|
|
return x**2
|
|
|
data = list(range(16,0,-1))
|
|
|
reference = list(map(f, data))
|
|
|
|
|
|
amr = self.view.map_async(slow_f, data, ordered=False)
|
|
|
self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
|
|
|
# check individual elements, retrieved as they come
|
|
|
# list comprehension uses __iter__
|
|
|
astheycame = [ r for r in amr ]
|
|
|
# Ensure that at least one result came out of order:
|
|
|
self.assertNotEqual(astheycame, reference, "should not have preserved order")
|
|
|
self.assertEqual(sorted(astheycame, reverse=True), reference, "result corrupted")
|
|
|
|
|
|
def test_map_ordered(self):
|
|
|
def f(x):
|
|
|
return x**2
|
|
|
def slow_f(x):
|
|
|
import time
|
|
|
time.sleep(0.05*x)
|
|
|
return x**2
|
|
|
data = list(range(16,0,-1))
|
|
|
reference = list(map(f, data))
|
|
|
|
|
|
amr = self.view.map_async(slow_f, data)
|
|
|
self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
|
|
|
# check individual elements, retrieved as they come
|
|
|
# list(amr) uses __iter__
|
|
|
astheycame = list(amr)
|
|
|
# Ensure that results came in order
|
|
|
self.assertEqual(astheycame, reference)
|
|
|
self.assertEqual(amr.result, reference)
|
|
|
|
|
|
def test_map_iterable(self):
|
|
|
"""test map on iterables (balanced)"""
|
|
|
view = self.view
|
|
|
# 101 is prime, so it won't be evenly distributed
|
|
|
arr = range(101)
|
|
|
# so that it will be an iterator, even in Python 3
|
|
|
it = iter(arr)
|
|
|
r = view.map_sync(lambda x:x, arr)
|
|
|
self.assertEqual(r, list(arr))
|
|
|
|
|
|
|
|
|
def test_abort(self):
|
|
|
view = self.view
|
|
|
ar = self.client[:].apply_async(time.sleep, .5)
|
|
|
ar = self.client[:].apply_async(time.sleep, .5)
|
|
|
time.sleep(0.2)
|
|
|
ar2 = view.apply_async(lambda : 2)
|
|
|
ar3 = view.apply_async(lambda : 3)
|
|
|
view.abort(ar2)
|
|
|
view.abort(ar3.msg_ids)
|
|
|
self.assertRaises(error.TaskAborted, ar2.get)
|
|
|
self.assertRaises(error.TaskAborted, ar3.get)
|
|
|
|
|
|
def test_retries(self):
|
|
|
self.minimum_engines(3)
|
|
|
view = self.view
|
|
|
def fail():
|
|
|
assert False
|
|
|
for r in range(len(self.client)-1):
|
|
|
with view.temp_flags(retries=r):
|
|
|
self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
|
|
|
|
|
|
with view.temp_flags(retries=len(self.client), timeout=0.1):
|
|
|
self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
|
|
|
|
|
|
def test_short_timeout(self):
|
|
|
self.minimum_engines(2)
|
|
|
view = self.view
|
|
|
def fail():
|
|
|
import time
|
|
|
time.sleep(0.25)
|
|
|
assert False
|
|
|
with view.temp_flags(retries=1, timeout=0.01):
|
|
|
self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
|
|
|
|
|
|
def test_invalid_dependency(self):
|
|
|
view = self.view
|
|
|
with view.temp_flags(after='12345'):
|
|
|
self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
|
|
|
|
|
|
def test_impossible_dependency(self):
|
|
|
self.minimum_engines(2)
|
|
|
view = self.client.load_balanced_view()
|
|
|
ar1 = view.apply_async(lambda : 1)
|
|
|
ar1.get()
|
|
|
e1 = ar1.engine_id
|
|
|
e2 = e1
|
|
|
while e2 == e1:
|
|
|
ar2 = view.apply_async(lambda : 1)
|
|
|
ar2.get()
|
|
|
e2 = ar2.engine_id
|
|
|
|
|
|
with view.temp_flags(follow=[ar1, ar2]):
|
|
|
self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
|
|
|
|
|
|
|
|
|
def test_follow(self):
|
|
|
ar = self.view.apply_async(lambda : 1)
|
|
|
ar.get()
|
|
|
ars = []
|
|
|
first_id = ar.engine_id
|
|
|
|
|
|
self.view.follow = ar
|
|
|
for i in range(5):
|
|
|
ars.append(self.view.apply_async(lambda : 1))
|
|
|
self.view.wait(ars)
|
|
|
for ar in ars:
|
|
|
self.assertEqual(ar.engine_id, first_id)
|
|
|
|
|
|
def test_after(self):
|
|
|
view = self.view
|
|
|
ar = view.apply_async(time.sleep, 0.5)
|
|
|
with view.temp_flags(after=ar):
|
|
|
ar2 = view.apply_async(lambda : 1)
|
|
|
|
|
|
ar.wait()
|
|
|
ar2.wait()
|
|
|
self.assertTrue(ar2.started >= ar.completed, "%s not >= %s"%(ar.started, ar.completed))
|
|
|
|