Show More
@@ -0,0 +1,120 b'' | |||||
|
1 | """test LoadBalancedView objects""" | |||
|
2 | # -*- coding: utf-8 -*- | |||
|
3 | #------------------------------------------------------------------------------- | |||
|
4 | # Copyright (C) 2011 The IPython Development Team | |||
|
5 | # | |||
|
6 | # Distributed under the terms of the BSD License. The full license is in | |||
|
7 | # the file COPYING, distributed as part of this software. | |||
|
8 | #------------------------------------------------------------------------------- | |||
|
9 | ||||
|
10 | #------------------------------------------------------------------------------- | |||
|
11 | # Imports | |||
|
12 | #------------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | import sys | |||
|
15 | import time | |||
|
16 | ||||
|
17 | import zmq | |||
|
18 | ||||
|
19 | from IPython import parallel as pmod | |||
|
20 | from IPython.parallel import error | |||
|
21 | ||||
|
22 | from IPython.parallel.tests import add_engines | |||
|
23 | ||||
|
24 | from .clienttest import ClusterTestCase, crash, wait, skip_without | |||
|
25 | ||||
|
26 | def setup(): | |||
|
27 | add_engines(3) | |||
|
28 | ||||
|
29 | class TestLoadBalancedView(ClusterTestCase): | |||
|
30 | ||||
|
31 | def setUp(self): | |||
|
32 | ClusterTestCase.setUp(self) | |||
|
33 | self.view = self.client.load_balanced_view() | |||
|
34 | ||||
|
35 | def test_z_crash_task(self): | |||
|
36 | """test graceful handling of engine death (balanced)""" | |||
|
37 | # self.add_engines(1) | |||
|
38 | ar = self.view.apply_async(crash) | |||
|
39 | self.assertRaisesRemote(error.EngineError, ar.get) | |||
|
40 | eid = ar.engine_id | |||
|
41 | tic = time.time() | |||
|
42 | while eid in self.client.ids and time.time()-tic < 5: | |||
|
43 | time.sleep(.01) | |||
|
44 | self.client.spin() | |||
|
45 | self.assertFalse(eid in self.client.ids, "Engine should have died") | |||
|
46 | ||||
|
47 | def test_map(self): | |||
|
48 | def f(x): | |||
|
49 | return x**2 | |||
|
50 | data = range(16) | |||
|
51 | r = self.view.map_sync(f, data) | |||
|
52 | self.assertEquals(r, map(f, data)) | |||
|
53 | ||||
|
54 | def test_abort(self): | |||
|
55 | view = self.view | |||
|
56 | ar = self.client[:].apply_async(time.sleep, .5) | |||
|
57 | ar2 = view.apply_async(lambda : 2) | |||
|
58 | ar3 = view.apply_async(lambda : 3) | |||
|
59 | view.abort(ar2) | |||
|
60 | view.abort(ar3.msg_ids) | |||
|
61 | self.assertRaises(error.TaskAborted, ar2.get) | |||
|
62 | self.assertRaises(error.TaskAborted, ar3.get) | |||
|
63 | ||||
|
64 | def test_retries(self): | |||
|
65 | add_engines(3) | |||
|
66 | view = self.view | |||
|
67 | view.timeout = 1 # prevent hang if this doesn't behave | |||
|
68 | def fail(): | |||
|
69 | assert False | |||
|
70 | for r in range(len(self.client)-1): | |||
|
71 | with view.temp_flags(retries=r): | |||
|
72 | self.assertRaisesRemote(AssertionError, view.apply_sync, fail) | |||
|
73 | ||||
|
74 | with view.temp_flags(retries=len(self.client), timeout=0.25): | |||
|
75 | self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail) | |||
|
76 | ||||
|
77 | def test_invalid_dependency(self): | |||
|
78 | view = self.view | |||
|
79 | with view.temp_flags(after='12345'): | |||
|
80 | self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1) | |||
|
81 | ||||
|
82 | def test_impossible_dependency(self): | |||
|
83 | if len(self.client) < 2: | |||
|
84 | add_engines(2) | |||
|
85 | view = self.client.load_balanced_view() | |||
|
86 | ar1 = view.apply_async(lambda : 1) | |||
|
87 | ar1.get() | |||
|
88 | e1 = ar1.engine_id | |||
|
89 | e2 = e1 | |||
|
90 | while e2 == e1: | |||
|
91 | ar2 = view.apply_async(lambda : 1) | |||
|
92 | ar2.get() | |||
|
93 | e2 = ar2.engine_id | |||
|
94 | ||||
|
95 | with view.temp_flags(follow=[ar1, ar2]): | |||
|
96 | self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1) | |||
|
97 | ||||
|
98 | ||||
|
99 | def test_follow(self): | |||
|
100 | ar = self.view.apply_async(lambda : 1) | |||
|
101 | ar.get() | |||
|
102 | ars = [] | |||
|
103 | first_id = ar.engine_id | |||
|
104 | ||||
|
105 | self.view.follow = ar | |||
|
106 | for i in range(5): | |||
|
107 | ars.append(self.view.apply_async(lambda : 1)) | |||
|
108 | self.view.wait(ars) | |||
|
109 | for ar in ars: | |||
|
110 | self.assertEquals(ar.engine_id, first_id) | |||
|
111 | ||||
|
112 | def test_after(self): | |||
|
113 | view = self.view | |||
|
114 | ar = view.apply_async(time.sleep, 0.5) | |||
|
115 | with view.temp_flags(after=ar): | |||
|
116 | ar2 = view.apply_async(lambda : 1) | |||
|
117 | ||||
|
118 | ar.wait() | |||
|
119 | ar2.wait() | |||
|
120 | self.assertTrue(ar2.started > ar.completed) |
@@ -19,7 +19,7 b' from types import ModuleType' | |||||
19 | import zmq |
|
19 | import zmq | |
20 |
|
20 | |||
21 | from IPython.testing import decorators as testdec |
|
21 | from IPython.testing import decorators as testdec | |
22 | from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat |
|
22 | from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat, CInt | |
23 |
|
23 | |||
24 | from IPython.external.decorator import decorator |
|
24 | from IPython.external.decorator import decorator | |
25 |
|
25 | |||
@@ -791,9 +791,10 b' class LoadBalancedView(View):' | |||||
791 | follow=Any() |
|
791 | follow=Any() | |
792 | after=Any() |
|
792 | after=Any() | |
793 | timeout=CFloat() |
|
793 | timeout=CFloat() | |
|
794 | retries = CInt(0) | |||
794 |
|
795 | |||
795 | _task_scheme = Any() |
|
796 | _task_scheme = Any() | |
796 | _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout']) |
|
797 | _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries']) | |
797 |
|
798 | |||
798 | def __init__(self, client=None, socket=None, **flags): |
|
799 | def __init__(self, client=None, socket=None, **flags): | |
799 | super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags) |
|
800 | super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags) | |
@@ -851,7 +852,7 b' class LoadBalancedView(View):' | |||||
851 | whether to create a MessageTracker to allow the user to |
|
852 | whether to create a MessageTracker to allow the user to | |
852 | safely edit after arrays and buffers during non-copying |
|
853 | safely edit after arrays and buffers during non-copying | |
853 | sends. |
|
854 | sends. | |
854 | # |
|
855 | ||
855 | after : Dependency or collection of msg_ids |
|
856 | after : Dependency or collection of msg_ids | |
856 | Only for load-balanced execution (targets=None) |
|
857 | Only for load-balanced execution (targets=None) | |
857 | Specify a list of msg_ids as a time-based dependency. |
|
858 | Specify a list of msg_ids as a time-based dependency. | |
@@ -869,6 +870,9 b' class LoadBalancedView(View):' | |||||
869 | Specify an amount of time (in seconds) for the scheduler to |
|
870 | Specify an amount of time (in seconds) for the scheduler to | |
870 | wait for dependencies to be met before failing with a |
|
871 | wait for dependencies to be met before failing with a | |
871 | DependencyTimeout. |
|
872 | DependencyTimeout. | |
|
873 | ||||
|
874 | retries : int | |||
|
875 | Number of times a task will be retried on failure. | |||
872 | """ |
|
876 | """ | |
873 |
|
877 | |||
874 | super(LoadBalancedView, self).set_flags(**kwargs) |
|
878 | super(LoadBalancedView, self).set_flags(**kwargs) | |
@@ -892,7 +896,7 b' class LoadBalancedView(View):' | |||||
892 | @save_ids |
|
896 | @save_ids | |
893 | def _really_apply(self, f, args=None, kwargs=None, block=None, track=None, |
|
897 | def _really_apply(self, f, args=None, kwargs=None, block=None, track=None, | |
894 | after=None, follow=None, timeout=None, |
|
898 | after=None, follow=None, timeout=None, | |
895 | targets=None): |
|
899 | targets=None, retries=None): | |
896 | """calls f(*args, **kwargs) on a remote engine, returning the result. |
|
900 | """calls f(*args, **kwargs) on a remote engine, returning the result. | |
897 |
|
901 | |||
898 | This method temporarily sets all of `apply`'s flags for a single call. |
|
902 | This method temporarily sets all of `apply`'s flags for a single call. | |
@@ -933,10 +937,11 b' class LoadBalancedView(View):' | |||||
933 | raise RuntimeError(msg) |
|
937 | raise RuntimeError(msg) | |
934 |
|
938 | |||
935 | if self._task_scheme == 'pure': |
|
939 | if self._task_scheme == 'pure': | |
936 |
# pure zmq scheme doesn't support |
|
940 | # pure zmq scheme doesn't support extra features | |
937 |
msg = "Pure ZMQ scheduler doesn't support |
|
941 | msg = "Pure ZMQ scheduler doesn't support the following flags:" | |
938 | if (follow or after): |
|
942 | "follow, after, retries, targets, timeout" | |
939 | # hard fail on DAG dependencies |
|
943 | if (follow or after or retries or targets or timeout): | |
|
944 | # hard fail on Scheduler flags | |||
940 | raise RuntimeError(msg) |
|
945 | raise RuntimeError(msg) | |
941 | if isinstance(f, dependent): |
|
946 | if isinstance(f, dependent): | |
942 | # soft warn on functional dependencies |
|
947 | # soft warn on functional dependencies | |
@@ -948,10 +953,14 b' class LoadBalancedView(View):' | |||||
948 | block = self.block if block is None else block |
|
953 | block = self.block if block is None else block | |
949 | track = self.track if track is None else track |
|
954 | track = self.track if track is None else track | |
950 | after = self.after if after is None else after |
|
955 | after = self.after if after is None else after | |
|
956 | retries = self.retries if retries is None else retries | |||
951 | follow = self.follow if follow is None else follow |
|
957 | follow = self.follow if follow is None else follow | |
952 | timeout = self.timeout if timeout is None else timeout |
|
958 | timeout = self.timeout if timeout is None else timeout | |
953 | targets = self.targets if targets is None else targets |
|
959 | targets = self.targets if targets is None else targets | |
954 |
|
960 | |||
|
961 | if not isinstance(retries, int): | |||
|
962 | raise TypeError('retries must be int, not %r'%type(retries)) | |||
|
963 | ||||
955 | if targets is None: |
|
964 | if targets is None: | |
956 | idents = [] |
|
965 | idents = [] | |
957 | else: |
|
966 | else: | |
@@ -959,7 +968,7 b' class LoadBalancedView(View):' | |||||
959 |
|
968 | |||
960 | after = self._render_dependency(after) |
|
969 | after = self._render_dependency(after) | |
961 | follow = self._render_dependency(follow) |
|
970 | follow = self._render_dependency(follow) | |
962 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents) |
|
971 | subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries) | |
963 |
|
972 | |||
964 | msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, |
|
973 | msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track, | |
965 | subheader=subheader) |
|
974 | subheader=subheader) |
@@ -137,6 +137,7 b' class TaskScheduler(SessionFactory):' | |||||
137 |
|
137 | |||
138 | # internals: |
|
138 | # internals: | |
139 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
139 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | |
|
140 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) | |||
140 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM |
|
141 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM | |
141 | depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) |
|
142 | depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) | |
142 | pending = Dict() # dict by engine_uuid of submitted tasks |
|
143 | pending = Dict() # dict by engine_uuid of submitted tasks | |
@@ -205,6 +206,8 b' class TaskScheduler(SessionFactory):' | |||||
205 | self.pending[uid] = {} |
|
206 | self.pending[uid] = {} | |
206 | if len(self.targets) == 1: |
|
207 | if len(self.targets) == 1: | |
207 | self.resume_receiving() |
|
208 | self.resume_receiving() | |
|
209 | # rescan the graph: | |||
|
210 | self.update_graph(None) | |||
208 |
|
211 | |||
209 | def _unregister_engine(self, uid): |
|
212 | def _unregister_engine(self, uid): | |
210 | """Existing engine with ident `uid` became unavailable.""" |
|
213 | """Existing engine with ident `uid` became unavailable.""" | |
@@ -215,11 +218,11 b' class TaskScheduler(SessionFactory):' | |||||
215 | # handle any potentially finished tasks: |
|
218 | # handle any potentially finished tasks: | |
216 | self.engine_stream.flush() |
|
219 | self.engine_stream.flush() | |
217 |
|
220 | |||
218 | self.completed.pop(uid) |
|
221 | # don't pop destinations, because they might be used later | |
219 | self.failed.pop(uid) |
|
|||
220 | # don't pop destinations, because it might be used later |
|
|||
221 | # map(self.destinations.pop, self.completed.pop(uid)) |
|
222 | # map(self.destinations.pop, self.completed.pop(uid)) | |
222 | # map(self.destinations.pop, self.failed.pop(uid)) |
|
223 | # map(self.destinations.pop, self.failed.pop(uid)) | |
|
224 | ||||
|
225 | # prevent this engine from receiving work | |||
223 | idx = self.targets.index(uid) |
|
226 | idx = self.targets.index(uid) | |
224 | self.targets.pop(idx) |
|
227 | self.targets.pop(idx) | |
225 | self.loads.pop(idx) |
|
228 | self.loads.pop(idx) | |
@@ -229,28 +232,40 b' class TaskScheduler(SessionFactory):' | |||||
229 | if self.pending[uid]: |
|
232 | if self.pending[uid]: | |
230 | dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) |
|
233 | dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) | |
231 | dc.start() |
|
234 | dc.start() | |
|
235 | else: | |||
|
236 | self.completed.pop(uid) | |||
|
237 | self.failed.pop(uid) | |||
|
238 | ||||
232 |
|
239 | |||
233 | @logged |
|
240 | @logged | |
234 | def handle_stranded_tasks(self, engine): |
|
241 | def handle_stranded_tasks(self, engine): | |
235 | """Deal with jobs resident in an engine that died.""" |
|
242 | """Deal with jobs resident in an engine that died.""" | |
236 |
lost = self.pending |
|
243 | lost = self.pending[engine] | |
237 |
|
244 | for msg_id in lost.keys(): | ||
238 | for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems(): |
|
245 | if msg_id not in self.pending[engine]: | |
239 | self.all_failed.add(msg_id) |
|
246 | # prevent double-handling of messages | |
240 | self.all_done.add(msg_id) |
|
247 | continue | |
|
248 | ||||
|
249 | raw_msg = lost[msg_id][0] | |||
|
250 | ||||
241 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
251 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
242 | msg = self.session.unpack_message(msg, copy=False, content=False) |
|
252 | msg = self.session.unpack_message(msg, copy=False, content=False) | |
243 | parent = msg['header'] |
|
253 | parent = msg['header'] | |
244 |
idents = [idents[0] |
|
254 | idents = [engine, idents[0]] | |
245 | # print (idents) |
|
255 | ||
|
256 | # build fake error reply | |||
246 | try: |
|
257 | try: | |
247 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) |
|
258 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) | |
248 | except: |
|
259 | except: | |
249 | content = error.wrap_exception() |
|
260 | content = error.wrap_exception() | |
250 |
msg = self.session. |
|
261 | msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'}) | |
251 | parent=parent, ident=idents) |
|
262 | raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents)) | |
252 | self.session.send(self.mon_stream, msg, ident=['outtask']+idents) |
|
263 | # and dispatch it | |
253 |
self. |
|
264 | self.dispatch_result(raw_reply) | |
|
265 | ||||
|
266 | # finally scrub completed/failed lists | |||
|
267 | self.completed.pop(engine) | |||
|
268 | self.failed.pop(engine) | |||
254 |
|
269 | |||
255 |
|
270 | |||
256 | #----------------------------------------------------------------------- |
|
271 | #----------------------------------------------------------------------- | |
@@ -277,6 +292,8 b' class TaskScheduler(SessionFactory):' | |||||
277 |
|
292 | |||
278 | # targets |
|
293 | # targets | |
279 | targets = set(header.get('targets', [])) |
|
294 | targets = set(header.get('targets', [])) | |
|
295 | retries = header.get('retries', 0) | |||
|
296 | self.retries[msg_id] = retries | |||
280 |
|
297 | |||
281 | # time dependencies |
|
298 | # time dependencies | |
282 | after = Dependency(header.get('after', [])) |
|
299 | after = Dependency(header.get('after', [])) | |
@@ -315,7 +332,9 b' class TaskScheduler(SessionFactory):' | |||||
315 | # time deps already met, try to run |
|
332 | # time deps already met, try to run | |
316 | if not self.maybe_run(msg_id, *args): |
|
333 | if not self.maybe_run(msg_id, *args): | |
317 | # can't run yet |
|
334 | # can't run yet | |
318 | self.save_unmet(msg_id, *args) |
|
335 | if msg_id not in self.all_failed: | |
|
336 | # could have failed as unreachable | |||
|
337 | self.save_unmet(msg_id, *args) | |||
319 | else: |
|
338 | else: | |
320 | self.save_unmet(msg_id, *args) |
|
339 | self.save_unmet(msg_id, *args) | |
321 |
|
340 | |||
@@ -328,7 +347,7 b' class TaskScheduler(SessionFactory):' | |||||
328 | if msg_id in self.depending: |
|
347 | if msg_id in self.depending: | |
329 | raw,after,targets,follow,timeout = self.depending[msg_id] |
|
348 | raw,after,targets,follow,timeout = self.depending[msg_id] | |
330 | if timeout and timeout < now: |
|
349 | if timeout and timeout < now: | |
331 |
self.fail_unreachable(msg_id, |
|
350 | self.fail_unreachable(msg_id, error.TaskTimeout) | |
332 |
|
351 | |||
333 | @logged |
|
352 | @logged | |
334 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): |
|
353 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): | |
@@ -369,7 +388,7 b' class TaskScheduler(SessionFactory):' | |||||
369 | # we need a can_run filter |
|
388 | # we need a can_run filter | |
370 | def can_run(idx): |
|
389 | def can_run(idx): | |
371 | # check hwm |
|
390 | # check hwm | |
372 | if self.loads[idx] == self.hwm: |
|
391 | if self.hwm and self.loads[idx] == self.hwm: | |
373 | return False |
|
392 | return False | |
374 | target = self.targets[idx] |
|
393 | target = self.targets[idx] | |
375 | # check blacklist |
|
394 | # check blacklist | |
@@ -382,6 +401,7 b' class TaskScheduler(SessionFactory):' | |||||
382 | return follow.check(self.completed[target], self.failed[target]) |
|
401 | return follow.check(self.completed[target], self.failed[target]) | |
383 |
|
402 | |||
384 | indices = filter(can_run, range(len(self.targets))) |
|
403 | indices = filter(can_run, range(len(self.targets))) | |
|
404 | ||||
385 | if not indices: |
|
405 | if not indices: | |
386 | # couldn't run |
|
406 | # couldn't run | |
387 | if follow.all: |
|
407 | if follow.all: | |
@@ -395,12 +415,14 b' class TaskScheduler(SessionFactory):' | |||||
395 | for m in follow.intersection(relevant): |
|
415 | for m in follow.intersection(relevant): | |
396 | dests.add(self.destinations[m]) |
|
416 | dests.add(self.destinations[m]) | |
397 | if len(dests) > 1: |
|
417 | if len(dests) > 1: | |
|
418 | self.depending[msg_id] = (raw_msg, targets, after, follow, timeout) | |||
398 | self.fail_unreachable(msg_id) |
|
419 | self.fail_unreachable(msg_id) | |
399 | return False |
|
420 | return False | |
400 | if targets: |
|
421 | if targets: | |
401 | # check blacklist+targets for impossibility |
|
422 | # check blacklist+targets for impossibility | |
402 | targets.difference_update(blacklist) |
|
423 | targets.difference_update(blacklist) | |
403 | if not targets or not targets.intersection(self.targets): |
|
424 | if not targets or not targets.intersection(self.targets): | |
|
425 | self.depending[msg_id] = (raw_msg, targets, after, follow, timeout) | |||
404 | self.fail_unreachable(msg_id) |
|
426 | self.fail_unreachable(msg_id) | |
405 | return False |
|
427 | return False | |
406 | return False |
|
428 | return False | |
@@ -454,20 +476,34 b' class TaskScheduler(SessionFactory):' | |||||
454 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
476 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
455 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
477 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
456 | engine = idents[0] |
|
478 | engine = idents[0] | |
457 | idx = self.targets.index(engine) |
|
479 | try: | |
458 | self.finish_job(idx) |
|
480 | idx = self.targets.index(engine) | |
|
481 | except ValueError: | |||
|
482 | pass # skip load-update for dead engines | |||
|
483 | else: | |||
|
484 | self.finish_job(idx) | |||
459 | except Exception: |
|
485 | except Exception: | |
460 | self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) |
|
486 | self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) | |
461 | return |
|
487 | return | |
462 |
|
488 | |||
463 | header = msg['header'] |
|
489 | header = msg['header'] | |
|
490 | parent = msg['parent_header'] | |||
464 | if header.get('dependencies_met', True): |
|
491 | if header.get('dependencies_met', True): | |
465 | success = (header['status'] == 'ok') |
|
492 | success = (header['status'] == 'ok') | |
466 | self.handle_result(idents, msg['parent_header'], raw_msg, success) |
|
493 | msg_id = parent['msg_id'] | |
467 | # send to Hub monitor |
|
494 | retries = self.retries[msg_id] | |
468 | self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) |
|
495 | if not success and retries > 0: | |
|
496 | # failed | |||
|
497 | self.retries[msg_id] = retries - 1 | |||
|
498 | self.handle_unmet_dependency(idents, parent) | |||
|
499 | else: | |||
|
500 | del self.retries[msg_id] | |||
|
501 | # relay to client and update graph | |||
|
502 | self.handle_result(idents, parent, raw_msg, success) | |||
|
503 | # send to Hub monitor | |||
|
504 | self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) | |||
469 | else: |
|
505 | else: | |
470 |
self.handle_unmet_dependency(idents, |
|
506 | self.handle_unmet_dependency(idents, parent) | |
471 |
|
507 | |||
472 | @logged |
|
508 | @logged | |
473 | def handle_result(self, idents, parent, raw_msg, success=True): |
|
509 | def handle_result(self, idents, parent, raw_msg, success=True): | |
@@ -511,13 +547,19 b' class TaskScheduler(SessionFactory):' | |||||
511 | self.depending[msg_id] = args |
|
547 | self.depending[msg_id] = args | |
512 | self.fail_unreachable(msg_id) |
|
548 | self.fail_unreachable(msg_id) | |
513 | elif not self.maybe_run(msg_id, *args): |
|
549 | elif not self.maybe_run(msg_id, *args): | |
514 |
# resubmit failed |
|
550 | # resubmit failed | |
515 | self.save_unmet(msg_id, *args) |
|
551 | if msg_id not in self.all_failed: | |
|
552 | # put it back in our dependency tree | |||
|
553 | self.save_unmet(msg_id, *args) | |||
516 |
|
554 | |||
517 | if self.hwm: |
|
555 | if self.hwm: | |
518 | idx = self.targets.index(engine) |
|
556 | try: | |
519 | if self.loads[idx] == self.hwm-1: |
|
557 | idx = self.targets.index(engine) | |
520 | self.update_graph(None) |
|
558 | except ValueError: | |
|
559 | pass # skip load-update for dead engines | |||
|
560 | else: | |||
|
561 | if self.loads[idx] == self.hwm-1: | |||
|
562 | self.update_graph(None) | |||
521 |
|
563 | |||
522 |
|
564 | |||
523 |
|
565 | |||
@@ -526,7 +568,7 b' class TaskScheduler(SessionFactory):' | |||||
526 | """dep_id just finished. Update our dependency |
|
568 | """dep_id just finished. Update our dependency | |
527 | graph and submit any jobs that just became runable. |
|
569 | graph and submit any jobs that just became runable. | |
528 |
|
570 | |||
529 | Called with dep_id=None to update graph for hwm, but without finishing |
|
571 | Called with dep_id=None to update entire graph for hwm, but without finishing | |
530 | a task. |
|
572 | a task. | |
531 | """ |
|
573 | """ | |
532 | # print ("\n\n***********") |
|
574 | # print ("\n\n***********") | |
@@ -538,9 +580,11 b' class TaskScheduler(SessionFactory):' | |||||
538 | # print ("\n\n***********\n\n") |
|
580 | # print ("\n\n***********\n\n") | |
539 | # update any jobs that depended on the dependency |
|
581 | # update any jobs that depended on the dependency | |
540 | jobs = self.graph.pop(dep_id, []) |
|
582 | jobs = self.graph.pop(dep_id, []) | |
541 | # if we have HWM and an engine just become no longer full |
|
583 | ||
542 |
# recheck *all* jobs |
|
584 | # recheck *all* jobs if | |
543 | if self.hwm and any( [ load==self.hwm-1 for load in self.loads]): |
|
585 | # a) we have HWM and an engine just become no longer full | |
|
586 | # or b) dep_id was given as None | |||
|
587 | if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]): | |||
544 | jobs = self.depending.keys() |
|
588 | jobs = self.depending.keys() | |
545 |
|
589 | |||
546 | for msg_id in jobs: |
|
590 | for msg_id in jobs: |
@@ -48,7 +48,7 b' class TestProcessLauncher(LocalProcessLauncher):' | |||||
48 | def setup(): |
|
48 | def setup(): | |
49 | cp = TestProcessLauncher() |
|
49 | cp = TestProcessLauncher() | |
50 | cp.cmd_and_args = ipcontroller_cmd_argv + \ |
|
50 | cp.cmd_and_args = ipcontroller_cmd_argv + \ | |
51 |
['--profile', 'iptest', '--log-level', '99', '-r' |
|
51 | ['--profile', 'iptest', '--log-level', '99', '-r'] | |
52 | cp.start() |
|
52 | cp.start() | |
53 | launchers.append(cp) |
|
53 | launchers.append(cp) | |
54 | cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest') |
|
54 | cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest') |
@@ -21,7 +21,7 b' import zmq' | |||||
21 | from IPython import parallel as pmod |
|
21 | from IPython import parallel as pmod | |
22 | from IPython.parallel import error |
|
22 | from IPython.parallel import error | |
23 | from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult |
|
23 | from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult | |
24 |
from IPython.parallel import |
|
24 | from IPython.parallel import DirectView | |
25 | from IPython.parallel.util import interactive |
|
25 | from IPython.parallel.util import interactive | |
26 |
|
26 | |||
27 | from IPython.parallel.tests import add_engines |
|
27 | from IPython.parallel.tests import add_engines | |
@@ -33,18 +33,6 b' def setup():' | |||||
33 |
|
33 | |||
34 | class TestView(ClusterTestCase): |
|
34 | class TestView(ClusterTestCase): | |
35 |
|
35 | |||
36 | def test_z_crash_task(self): |
|
|||
37 | """test graceful handling of engine death (balanced)""" |
|
|||
38 | # self.add_engines(1) |
|
|||
39 | ar = self.client[-1].apply_async(crash) |
|
|||
40 | self.assertRaisesRemote(error.EngineError, ar.get) |
|
|||
41 | eid = ar.engine_id |
|
|||
42 | tic = time.time() |
|
|||
43 | while eid in self.client.ids and time.time()-tic < 5: |
|
|||
44 | time.sleep(.01) |
|
|||
45 | self.client.spin() |
|
|||
46 | self.assertFalse(eid in self.client.ids, "Engine should have died") |
|
|||
47 |
|
||||
48 | def test_z_crash_mux(self): |
|
36 | def test_z_crash_mux(self): | |
49 | """test graceful handling of engine death (direct)""" |
|
37 | """test graceful handling of engine death (direct)""" | |
50 | # self.add_engines(1) |
|
38 | # self.add_engines(1) |
General Comments 0
You need to be logged in to leave comments.
Login now