##// END OF EJS Templates
add retries flag to LoadBalancedView...
MinRK -
Show More
@@ -0,0 +1,120 b''
1 """test LoadBalancedView objects"""
2 # -*- coding: utf-8 -*-
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2011 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
14 import sys
15 import time
16
17 import zmq
18
19 from IPython import parallel as pmod
20 from IPython.parallel import error
21
22 from IPython.parallel.tests import add_engines
23
24 from .clienttest import ClusterTestCase, crash, wait, skip_without
25
26 def setup():
27 add_engines(3)
28
29 class TestLoadBalancedView(ClusterTestCase):
30
31 def setUp(self):
32 ClusterTestCase.setUp(self)
33 self.view = self.client.load_balanced_view()
34
35 def test_z_crash_task(self):
36 """test graceful handling of engine death (balanced)"""
37 # self.add_engines(1)
38 ar = self.view.apply_async(crash)
39 self.assertRaisesRemote(error.EngineError, ar.get)
40 eid = ar.engine_id
41 tic = time.time()
42 while eid in self.client.ids and time.time()-tic < 5:
43 time.sleep(.01)
44 self.client.spin()
45 self.assertFalse(eid in self.client.ids, "Engine should have died")
46
47 def test_map(self):
48 def f(x):
49 return x**2
50 data = range(16)
51 r = self.view.map_sync(f, data)
52 self.assertEquals(r, map(f, data))
53
54 def test_abort(self):
55 view = self.view
56 ar = self.client[:].apply_async(time.sleep, .5)
57 ar2 = view.apply_async(lambda : 2)
58 ar3 = view.apply_async(lambda : 3)
59 view.abort(ar2)
60 view.abort(ar3.msg_ids)
61 self.assertRaises(error.TaskAborted, ar2.get)
62 self.assertRaises(error.TaskAborted, ar3.get)
63
64 def test_retries(self):
65 add_engines(3)
66 view = self.view
67 view.timeout = 1 # prevent hang if this doesn't behave
68 def fail():
69 assert False
70 for r in range(len(self.client)-1):
71 with view.temp_flags(retries=r):
72 self.assertRaisesRemote(AssertionError, view.apply_sync, fail)
73
74 with view.temp_flags(retries=len(self.client), timeout=0.25):
75 self.assertRaisesRemote(error.TaskTimeout, view.apply_sync, fail)
76
77 def test_invalid_dependency(self):
78 view = self.view
79 with view.temp_flags(after='12345'):
80 self.assertRaisesRemote(error.InvalidDependency, view.apply_sync, lambda : 1)
81
82 def test_impossible_dependency(self):
83 if len(self.client) < 2:
84 add_engines(2)
85 view = self.client.load_balanced_view()
86 ar1 = view.apply_async(lambda : 1)
87 ar1.get()
88 e1 = ar1.engine_id
89 e2 = e1
90 while e2 == e1:
91 ar2 = view.apply_async(lambda : 1)
92 ar2.get()
93 e2 = ar2.engine_id
94
95 with view.temp_flags(follow=[ar1, ar2]):
96 self.assertRaisesRemote(error.ImpossibleDependency, view.apply_sync, lambda : 1)
97
98
99 def test_follow(self):
100 ar = self.view.apply_async(lambda : 1)
101 ar.get()
102 ars = []
103 first_id = ar.engine_id
104
105 self.view.follow = ar
106 for i in range(5):
107 ars.append(self.view.apply_async(lambda : 1))
108 self.view.wait(ars)
109 for ar in ars:
110 self.assertEquals(ar.engine_id, first_id)
111
112 def test_after(self):
113 view = self.view
114 ar = view.apply_async(time.sleep, 0.5)
115 with view.temp_flags(after=ar):
116 ar2 = view.apply_async(lambda : 1)
117
118 ar.wait()
119 ar2.wait()
120 self.assertTrue(ar2.started > ar.completed)
@@ -19,7 +19,7 b' from types import ModuleType'
19 import zmq
19 import zmq
20
20
21 from IPython.testing import decorators as testdec
21 from IPython.testing import decorators as testdec
22 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat
22 from IPython.utils.traitlets import HasTraits, Any, Bool, List, Dict, Set, Int, Instance, CFloat, CInt
23
23
24 from IPython.external.decorator import decorator
24 from IPython.external.decorator import decorator
25
25
@@ -791,9 +791,10 b' class LoadBalancedView(View):'
791 follow=Any()
791 follow=Any()
792 after=Any()
792 after=Any()
793 timeout=CFloat()
793 timeout=CFloat()
794 retries = CInt(0)
794
795
795 _task_scheme = Any()
796 _task_scheme = Any()
796 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout'])
797 _flag_names = List(['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'])
797
798
798 def __init__(self, client=None, socket=None, **flags):
799 def __init__(self, client=None, socket=None, **flags):
799 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
800 super(LoadBalancedView, self).__init__(client=client, socket=socket, **flags)
@@ -851,7 +852,7 b' class LoadBalancedView(View):'
851 whether to create a MessageTracker to allow the user to
852 whether to create a MessageTracker to allow the user to
852 safely edit after arrays and buffers during non-copying
853 safely edit after arrays and buffers during non-copying
853 sends.
854 sends.
854 #
855
855 after : Dependency or collection of msg_ids
856 after : Dependency or collection of msg_ids
856 Only for load-balanced execution (targets=None)
857 Only for load-balanced execution (targets=None)
857 Specify a list of msg_ids as a time-based dependency.
858 Specify a list of msg_ids as a time-based dependency.
@@ -869,6 +870,9 b' class LoadBalancedView(View):'
869 Specify an amount of time (in seconds) for the scheduler to
870 Specify an amount of time (in seconds) for the scheduler to
870 wait for dependencies to be met before failing with a
871 wait for dependencies to be met before failing with a
871 DependencyTimeout.
872 DependencyTimeout.
873
874 retries : int
875 Number of times a task will be retried on failure.
872 """
876 """
873
877
874 super(LoadBalancedView, self).set_flags(**kwargs)
878 super(LoadBalancedView, self).set_flags(**kwargs)
@@ -892,7 +896,7 b' class LoadBalancedView(View):'
892 @save_ids
896 @save_ids
893 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
897 def _really_apply(self, f, args=None, kwargs=None, block=None, track=None,
894 after=None, follow=None, timeout=None,
898 after=None, follow=None, timeout=None,
895 targets=None):
899 targets=None, retries=None):
896 """calls f(*args, **kwargs) on a remote engine, returning the result.
900 """calls f(*args, **kwargs) on a remote engine, returning the result.
897
901
898 This method temporarily sets all of `apply`'s flags for a single call.
902 This method temporarily sets all of `apply`'s flags for a single call.
@@ -933,10 +937,11 b' class LoadBalancedView(View):'
933 raise RuntimeError(msg)
937 raise RuntimeError(msg)
934
938
935 if self._task_scheme == 'pure':
939 if self._task_scheme == 'pure':
936 # pure zmq scheme doesn't support dependencies
940 # pure zmq scheme doesn't support extra features
937 msg = "Pure ZMQ scheduler doesn't support dependencies"
941 msg = "Pure ZMQ scheduler doesn't support the following flags:"
938 if (follow or after):
942 "follow, after, retries, targets, timeout"
939 # hard fail on DAG dependencies
943 if (follow or after or retries or targets or timeout):
944 # hard fail on Scheduler flags
940 raise RuntimeError(msg)
945 raise RuntimeError(msg)
941 if isinstance(f, dependent):
946 if isinstance(f, dependent):
942 # soft warn on functional dependencies
947 # soft warn on functional dependencies
@@ -948,10 +953,14 b' class LoadBalancedView(View):'
948 block = self.block if block is None else block
953 block = self.block if block is None else block
949 track = self.track if track is None else track
954 track = self.track if track is None else track
950 after = self.after if after is None else after
955 after = self.after if after is None else after
956 retries = self.retries if retries is None else retries
951 follow = self.follow if follow is None else follow
957 follow = self.follow if follow is None else follow
952 timeout = self.timeout if timeout is None else timeout
958 timeout = self.timeout if timeout is None else timeout
953 targets = self.targets if targets is None else targets
959 targets = self.targets if targets is None else targets
954
960
961 if not isinstance(retries, int):
962 raise TypeError('retries must be int, not %r'%type(retries))
963
955 if targets is None:
964 if targets is None:
956 idents = []
965 idents = []
957 else:
966 else:
@@ -959,7 +968,7 b' class LoadBalancedView(View):'
959
968
960 after = self._render_dependency(after)
969 after = self._render_dependency(after)
961 follow = self._render_dependency(follow)
970 follow = self._render_dependency(follow)
962 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
971 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents, retries=retries)
963
972
964 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
973 msg = self.client.send_apply_message(self._socket, f, args, kwargs, track=track,
965 subheader=subheader)
974 subheader=subheader)
@@ -137,6 +137,7 b' class TaskScheduler(SessionFactory):'
137
137
138 # internals:
138 # internals:
139 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
139 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
140 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
140 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
141 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
141 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
142 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
142 pending = Dict() # dict by engine_uuid of submitted tasks
143 pending = Dict() # dict by engine_uuid of submitted tasks
@@ -205,6 +206,8 b' class TaskScheduler(SessionFactory):'
205 self.pending[uid] = {}
206 self.pending[uid] = {}
206 if len(self.targets) == 1:
207 if len(self.targets) == 1:
207 self.resume_receiving()
208 self.resume_receiving()
209 # rescan the graph:
210 self.update_graph(None)
208
211
209 def _unregister_engine(self, uid):
212 def _unregister_engine(self, uid):
210 """Existing engine with ident `uid` became unavailable."""
213 """Existing engine with ident `uid` became unavailable."""
@@ -215,11 +218,11 b' class TaskScheduler(SessionFactory):'
215 # handle any potentially finished tasks:
218 # handle any potentially finished tasks:
216 self.engine_stream.flush()
219 self.engine_stream.flush()
217
220
218 self.completed.pop(uid)
221 # don't pop destinations, because they might be used later
219 self.failed.pop(uid)
220 # don't pop destinations, because it might be used later
221 # map(self.destinations.pop, self.completed.pop(uid))
222 # map(self.destinations.pop, self.completed.pop(uid))
222 # map(self.destinations.pop, self.failed.pop(uid))
223 # map(self.destinations.pop, self.failed.pop(uid))
224
225 # prevent this engine from receiving work
223 idx = self.targets.index(uid)
226 idx = self.targets.index(uid)
224 self.targets.pop(idx)
227 self.targets.pop(idx)
225 self.loads.pop(idx)
228 self.loads.pop(idx)
@@ -229,28 +232,40 b' class TaskScheduler(SessionFactory):'
229 if self.pending[uid]:
232 if self.pending[uid]:
230 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
233 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
231 dc.start()
234 dc.start()
235 else:
236 self.completed.pop(uid)
237 self.failed.pop(uid)
238
232
239
233 @logged
240 @logged
234 def handle_stranded_tasks(self, engine):
241 def handle_stranded_tasks(self, engine):
235 """Deal with jobs resident in an engine that died."""
242 """Deal with jobs resident in an engine that died."""
236 lost = self.pending.pop(engine)
243 lost = self.pending[engine]
237
244 for msg_id in lost.keys():
238 for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems():
245 if msg_id not in self.pending[engine]:
239 self.all_failed.add(msg_id)
246 # prevent double-handling of messages
240 self.all_done.add(msg_id)
247 continue
248
249 raw_msg = lost[msg_id][0]
250
241 idents,msg = self.session.feed_identities(raw_msg, copy=False)
251 idents,msg = self.session.feed_identities(raw_msg, copy=False)
242 msg = self.session.unpack_message(msg, copy=False, content=False)
252 msg = self.session.unpack_message(msg, copy=False, content=False)
243 parent = msg['header']
253 parent = msg['header']
244 idents = [idents[0],engine]+idents[1:]
254 idents = [engine, idents[0]]
245 # print (idents)
255
256 # build fake error reply
246 try:
257 try:
247 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
258 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
248 except:
259 except:
249 content = error.wrap_exception()
260 content = error.wrap_exception()
250 msg = self.session.send(self.client_stream, 'apply_reply', content,
261 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
251 parent=parent, ident=idents)
262 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
252 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
263 # and dispatch it
253 self.update_graph(msg_id)
264 self.dispatch_result(raw_reply)
265
266 # finally scrub completed/failed lists
267 self.completed.pop(engine)
268 self.failed.pop(engine)
254
269
255
270
256 #-----------------------------------------------------------------------
271 #-----------------------------------------------------------------------
@@ -277,6 +292,8 b' class TaskScheduler(SessionFactory):'
277
292
278 # targets
293 # targets
279 targets = set(header.get('targets', []))
294 targets = set(header.get('targets', []))
295 retries = header.get('retries', 0)
296 self.retries[msg_id] = retries
280
297
281 # time dependencies
298 # time dependencies
282 after = Dependency(header.get('after', []))
299 after = Dependency(header.get('after', []))
@@ -315,7 +332,9 b' class TaskScheduler(SessionFactory):'
315 # time deps already met, try to run
332 # time deps already met, try to run
316 if not self.maybe_run(msg_id, *args):
333 if not self.maybe_run(msg_id, *args):
317 # can't run yet
334 # can't run yet
318 self.save_unmet(msg_id, *args)
335 if msg_id not in self.all_failed:
336 # could have failed as unreachable
337 self.save_unmet(msg_id, *args)
319 else:
338 else:
320 self.save_unmet(msg_id, *args)
339 self.save_unmet(msg_id, *args)
321
340
@@ -328,7 +347,7 b' class TaskScheduler(SessionFactory):'
328 if msg_id in self.depending:
347 if msg_id in self.depending:
329 raw,after,targets,follow,timeout = self.depending[msg_id]
348 raw,after,targets,follow,timeout = self.depending[msg_id]
330 if timeout and timeout < now:
349 if timeout and timeout < now:
331 self.fail_unreachable(msg_id, timeout=True)
350 self.fail_unreachable(msg_id, error.TaskTimeout)
332
351
333 @logged
352 @logged
334 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
353 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
@@ -369,7 +388,7 b' class TaskScheduler(SessionFactory):'
369 # we need a can_run filter
388 # we need a can_run filter
370 def can_run(idx):
389 def can_run(idx):
371 # check hwm
390 # check hwm
372 if self.loads[idx] == self.hwm:
391 if self.hwm and self.loads[idx] == self.hwm:
373 return False
392 return False
374 target = self.targets[idx]
393 target = self.targets[idx]
375 # check blacklist
394 # check blacklist
@@ -382,6 +401,7 b' class TaskScheduler(SessionFactory):'
382 return follow.check(self.completed[target], self.failed[target])
401 return follow.check(self.completed[target], self.failed[target])
383
402
384 indices = filter(can_run, range(len(self.targets)))
403 indices = filter(can_run, range(len(self.targets)))
404
385 if not indices:
405 if not indices:
386 # couldn't run
406 # couldn't run
387 if follow.all:
407 if follow.all:
@@ -395,12 +415,14 b' class TaskScheduler(SessionFactory):'
395 for m in follow.intersection(relevant):
415 for m in follow.intersection(relevant):
396 dests.add(self.destinations[m])
416 dests.add(self.destinations[m])
397 if len(dests) > 1:
417 if len(dests) > 1:
418 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
398 self.fail_unreachable(msg_id)
419 self.fail_unreachable(msg_id)
399 return False
420 return False
400 if targets:
421 if targets:
401 # check blacklist+targets for impossibility
422 # check blacklist+targets for impossibility
402 targets.difference_update(blacklist)
423 targets.difference_update(blacklist)
403 if not targets or not targets.intersection(self.targets):
424 if not targets or not targets.intersection(self.targets):
425 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
404 self.fail_unreachable(msg_id)
426 self.fail_unreachable(msg_id)
405 return False
427 return False
406 return False
428 return False
@@ -454,20 +476,34 b' class TaskScheduler(SessionFactory):'
454 idents,msg = self.session.feed_identities(raw_msg, copy=False)
476 idents,msg = self.session.feed_identities(raw_msg, copy=False)
455 msg = self.session.unpack_message(msg, content=False, copy=False)
477 msg = self.session.unpack_message(msg, content=False, copy=False)
456 engine = idents[0]
478 engine = idents[0]
457 idx = self.targets.index(engine)
479 try:
458 self.finish_job(idx)
480 idx = self.targets.index(engine)
481 except ValueError:
482 pass # skip load-update for dead engines
483 else:
484 self.finish_job(idx)
459 except Exception:
485 except Exception:
460 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
486 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
461 return
487 return
462
488
463 header = msg['header']
489 header = msg['header']
490 parent = msg['parent_header']
464 if header.get('dependencies_met', True):
491 if header.get('dependencies_met', True):
465 success = (header['status'] == 'ok')
492 success = (header['status'] == 'ok')
466 self.handle_result(idents, msg['parent_header'], raw_msg, success)
493 msg_id = parent['msg_id']
467 # send to Hub monitor
494 retries = self.retries[msg_id]
468 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
495 if not success and retries > 0:
496 # failed
497 self.retries[msg_id] = retries - 1
498 self.handle_unmet_dependency(idents, parent)
499 else:
500 del self.retries[msg_id]
501 # relay to client and update graph
502 self.handle_result(idents, parent, raw_msg, success)
503 # send to Hub monitor
504 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
469 else:
505 else:
470 self.handle_unmet_dependency(idents, msg['parent_header'])
506 self.handle_unmet_dependency(idents, parent)
471
507
472 @logged
508 @logged
473 def handle_result(self, idents, parent, raw_msg, success=True):
509 def handle_result(self, idents, parent, raw_msg, success=True):
@@ -511,13 +547,19 b' class TaskScheduler(SessionFactory):'
511 self.depending[msg_id] = args
547 self.depending[msg_id] = args
512 self.fail_unreachable(msg_id)
548 self.fail_unreachable(msg_id)
513 elif not self.maybe_run(msg_id, *args):
549 elif not self.maybe_run(msg_id, *args):
514 # resubmit failed, put it back in our dependency tree
550 # resubmit failed
515 self.save_unmet(msg_id, *args)
551 if msg_id not in self.all_failed:
552 # put it back in our dependency tree
553 self.save_unmet(msg_id, *args)
516
554
517 if self.hwm:
555 if self.hwm:
518 idx = self.targets.index(engine)
556 try:
519 if self.loads[idx] == self.hwm-1:
557 idx = self.targets.index(engine)
520 self.update_graph(None)
558 except ValueError:
559 pass # skip load-update for dead engines
560 else:
561 if self.loads[idx] == self.hwm-1:
562 self.update_graph(None)
521
563
522
564
523
565
@@ -526,7 +568,7 b' class TaskScheduler(SessionFactory):'
526 """dep_id just finished. Update our dependency
568 """dep_id just finished. Update our dependency
527 graph and submit any jobs that just became runable.
569 graph and submit any jobs that just became runable.
528
570
529 Called with dep_id=None to update graph for hwm, but without finishing
571 Called with dep_id=None to update entire graph for hwm, but without finishing
530 a task.
572 a task.
531 """
573 """
532 # print ("\n\n***********")
574 # print ("\n\n***********")
@@ -538,9 +580,11 b' class TaskScheduler(SessionFactory):'
538 # print ("\n\n***********\n\n")
580 # print ("\n\n***********\n\n")
539 # update any jobs that depended on the dependency
581 # update any jobs that depended on the dependency
540 jobs = self.graph.pop(dep_id, [])
582 jobs = self.graph.pop(dep_id, [])
541 # if we have HWM and an engine just become no longer full
583
542 # recheck *all* jobs:
584 # recheck *all* jobs if
543 if self.hwm and any( [ load==self.hwm-1 for load in self.loads]):
585 # a) we have HWM and an engine just become no longer full
586 # or b) dep_id was given as None
587 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
544 jobs = self.depending.keys()
588 jobs = self.depending.keys()
545
589
546 for msg_id in jobs:
590 for msg_id in jobs:
@@ -48,7 +48,7 b' class TestProcessLauncher(LocalProcessLauncher):'
48 def setup():
48 def setup():
49 cp = TestProcessLauncher()
49 cp = TestProcessLauncher()
50 cp.cmd_and_args = ipcontroller_cmd_argv + \
50 cp.cmd_and_args = ipcontroller_cmd_argv + \
51 ['--profile', 'iptest', '--log-level', '99', '-r', '--usethreads']
51 ['--profile', 'iptest', '--log-level', '99', '-r']
52 cp.start()
52 cp.start()
53 launchers.append(cp)
53 launchers.append(cp)
54 cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
54 cluster_dir = os.path.join(get_ipython_dir(), 'cluster_iptest')
@@ -21,7 +21,7 b' import zmq'
21 from IPython import parallel as pmod
21 from IPython import parallel as pmod
22 from IPython.parallel import error
22 from IPython.parallel import error
23 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
23 from IPython.parallel import AsyncResult, AsyncHubResult, AsyncMapResult
24 from IPython.parallel import LoadBalancedView, DirectView
24 from IPython.parallel import DirectView
25 from IPython.parallel.util import interactive
25 from IPython.parallel.util import interactive
26
26
27 from IPython.parallel.tests import add_engines
27 from IPython.parallel.tests import add_engines
@@ -33,18 +33,6 b' def setup():'
33
33
34 class TestView(ClusterTestCase):
34 class TestView(ClusterTestCase):
35
35
36 def test_z_crash_task(self):
37 """test graceful handling of engine death (balanced)"""
38 # self.add_engines(1)
39 ar = self.client[-1].apply_async(crash)
40 self.assertRaisesRemote(error.EngineError, ar.get)
41 eid = ar.engine_id
42 tic = time.time()
43 while eid in self.client.ids and time.time()-tic < 5:
44 time.sleep(.01)
45 self.client.spin()
46 self.assertFalse(eid in self.client.ids, "Engine should have died")
47
48 def test_z_crash_mux(self):
36 def test_z_crash_mux(self):
49 """test graceful handling of engine death (direct)"""
37 """test graceful handling of engine death (direct)"""
50 # self.add_engines(1)
38 # self.add_engines(1)
General Comments 0
You need to be logged in to leave comments. Login now