Show More
@@ -17,10 +17,9 b' Authors:' | |||
|
17 | 17 | #----------------------------------------------------------------------------- |
|
18 | 18 | |
|
19 | 19 | from tornado import web |
|
20 | from zmq.eventloop import ioloop | |
|
21 | 20 | |
|
22 | 21 | from IPython.config.configurable import LoggingConfigurable |
|
23 |
from IPython.utils.traitlets import Dict, Instance, |
|
|
22 | from IPython.utils.traitlets import Dict, Instance, Float | |
|
24 | 23 | from IPython.core.profileapp import list_profiles_in |
|
25 | 24 | from IPython.core.profiledir import ProfileDir |
|
26 | 25 | from IPython.utils import py3compat |
@@ -38,7 +37,7 b' class ClusterManager(LoggingConfigurable):' | |||
|
38 | 37 | |
|
39 | 38 | profiles = Dict() |
|
40 | 39 | |
|
41 |
delay = |
|
|
40 | delay = Float(1., config=True, | |
|
42 | 41 | help="delay (in s) between starting the controller and the engines") |
|
43 | 42 | |
|
44 | 43 | loop = Instance('zmq.eventloop.ioloop.IOLoop') |
@@ -133,11 +132,13 b' class ClusterManager(LoggingConfigurable):' | |||
|
133 | 132 | esl.stop() |
|
134 | 133 | clean_data() |
|
135 | 134 | cl.on_stop(controller_stopped) |
|
135 | loop = self.loop | |
|
136 | 136 | |
|
137 | dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop) | |
|
138 | dc.start() | |
|
139 | dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop) | |
|
140 | dc.start() | |
|
137 | def start(): | |
|
138 | """start the controller, then the engines after a delay""" | |
|
139 | cl.start() | |
|
140 | loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n)) | |
|
141 | self.loop.add_callback(start) | |
|
141 | 142 | |
|
142 | 143 | self.log.debug('Cluster started') |
|
143 | 144 | data['controller_launcher'] = cl |
@@ -1,32 +1,14 b'' | |||
|
1 | 1 | # encoding: utf-8 |
|
2 | 2 | """ |
|
3 | 3 | The Base Application class for IPython.parallel apps |
|
4 | ||
|
5 | Authors: | |
|
6 | ||
|
7 | * Brian Granger | |
|
8 | * Min RK | |
|
9 | ||
|
10 | 4 | """ |
|
11 | 5 | |
|
12 | #----------------------------------------------------------------------------- | |
|
13 | # Copyright (C) 2008-2011 The IPython Development Team | |
|
14 | # | |
|
15 | # Distributed under the terms of the BSD License. The full license is in | |
|
16 | # the file COPYING, distributed as part of this software. | |
|
17 | #----------------------------------------------------------------------------- | |
|
18 | ||
|
19 | #----------------------------------------------------------------------------- | |
|
20 | # Imports | |
|
21 | #----------------------------------------------------------------------------- | |
|
22 | 6 | |
|
23 | 7 | import os |
|
24 | 8 | import logging |
|
25 | 9 | import re |
|
26 | 10 | import sys |
|
27 | 11 | |
|
28 | from subprocess import Popen, PIPE | |
|
29 | ||
|
30 | 12 | from IPython.config.application import catch_config_error, LevelFormatter |
|
31 | 13 | from IPython.core import release |
|
32 | 14 | from IPython.core.crashhandler import CrashHandler |
@@ -1,27 +1,8 b'' | |||
|
1 | 1 | #!/usr/bin/env python |
|
2 | 2 | # encoding: utf-8 |
|
3 | """ | |
|
4 | The ipcluster application. | |
|
5 | ||
|
6 | Authors: | |
|
7 | ||
|
8 | * Brian Granger | |
|
9 | * MinRK | |
|
10 | ||
|
11 | """ | |
|
3 | """The ipcluster application.""" | |
|
12 | 4 | from __future__ import print_function |
|
13 | 5 | |
|
14 | #----------------------------------------------------------------------------- | |
|
15 | # Copyright (C) 2008-2011 The IPython Development Team | |
|
16 | # | |
|
17 | # Distributed under the terms of the BSD License. The full license is in | |
|
18 | # the file COPYING, distributed as part of this software. | |
|
19 | #----------------------------------------------------------------------------- | |
|
20 | ||
|
21 | #----------------------------------------------------------------------------- | |
|
22 | # Imports | |
|
23 | #----------------------------------------------------------------------------- | |
|
24 | ||
|
25 | 6 | import errno |
|
26 | 7 | import logging |
|
27 | 8 | import os |
@@ -30,9 +11,8 b' import signal' | |||
|
30 | 11 | |
|
31 | 12 | from subprocess import check_call, CalledProcessError, PIPE |
|
32 | 13 | import zmq |
|
33 | from zmq.eventloop import ioloop | |
|
34 | 14 | |
|
35 |
from IPython.config.application import |
|
|
15 | from IPython.config.application import catch_config_error | |
|
36 | 16 | from IPython.config.loader import Config |
|
37 | 17 | from IPython.core.application import BaseIPythonApplication |
|
38 | 18 | from IPython.core.profiledir import ProfileDir |
@@ -355,7 +335,7 b' class IPClusterEngines(BaseParallelApplication):' | |||
|
355 | 335 | raise |
|
356 | 336 | self.engine_launcher.on_stop(self.engines_stopped_early) |
|
357 | 337 | if self.early_shutdown: |
|
358 | ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start() | |
|
338 | self.loop.add_timeout(self.loop.time() + self.early_shutdown, self.engines_started_ok) | |
|
359 | 339 | |
|
360 | 340 | def engines_stopped_early(self, r): |
|
361 | 341 | if self.early_shutdown and not self._stopping: |
@@ -393,8 +373,7 b' class IPClusterEngines(BaseParallelApplication):' | |||
|
393 | 373 | self.log.error("IPython cluster: stopping") |
|
394 | 374 | self.stop_engines() |
|
395 | 375 | # Wait a few seconds to let things shut down. |
|
396 | dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop) | |
|
397 | dc.start() | |
|
376 | self.loop.add_timeout(self.loop.time() + 3, self.loop.stop) | |
|
398 | 377 | |
|
399 | 378 | def sigint_handler(self, signum, frame): |
|
400 | 379 | self.log.debug("SIGINT received, stopping launchers...") |
@@ -422,8 +401,7 b' class IPClusterEngines(BaseParallelApplication):' | |||
|
422 | 401 | if os.name=='posix': |
|
423 | 402 | daemonize() |
|
424 | 403 | |
|
425 |
|
|
|
426 | dc.start() | |
|
404 | self.loop.add_callback(self.start_engines) | |
|
427 | 405 | # Now write the new pid file AFTER our new forked pid is active. |
|
428 | 406 | # self.write_pid_file() |
|
429 | 407 | try: |
@@ -566,10 +544,10 b' class IPClusterStart(IPClusterEngines):' | |||
|
566 | 544 | if os.name=='posix': |
|
567 | 545 | daemonize() |
|
568 | 546 | |
|
569 | dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop) | |
|
570 |
|
|
|
571 | dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop) | |
|
572 | dc.start() | |
|
547 | def start(): | |
|
548 | self.start_controller() | |
|
549 | self.loop.add_timeout(self.loop.time() + self.delay, self.start_engines) | |
|
550 | self.loop.add_callback(start) | |
|
573 | 551 | # Now write the new pid file AFTER our new forked pid is active. |
|
574 | 552 | self.write_pid_file() |
|
575 | 553 | try: |
@@ -294,8 +294,7 b' class LocalProcessLauncher(BaseLauncher):' | |||
|
294 | 294 | except Exception: |
|
295 | 295 | self.log.debug("interrupt failed") |
|
296 | 296 | pass |
|
297 |
self.killer = |
|
|
298 | self.killer.start() | |
|
297 | self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL)) | |
|
299 | 298 | |
|
300 | 299 | # callbacks, etc: |
|
301 | 300 |
@@ -16,7 +16,6 b' import time' | |||
|
16 | 16 | from datetime import datetime |
|
17 | 17 | |
|
18 | 18 | import zmq |
|
19 | from zmq.eventloop import ioloop | |
|
20 | 19 | from zmq.eventloop.zmqstream import ZMQStream |
|
21 | 20 | |
|
22 | 21 | # internal: |
@@ -25,7 +24,7 b' from IPython.utils.jsonutil import extract_dates' | |||
|
25 | 24 | from IPython.utils.localinterfaces import localhost |
|
26 | 25 | from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems |
|
27 | 26 | from IPython.utils.traitlets import ( |
|
28 |
HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, |
|
|
27 | HasTraits, Any, Instance, Integer, Unicode, Dict, Set, Tuple, DottedObjectName | |
|
29 | 28 | ) |
|
30 | 29 | |
|
31 | 30 | from IPython.parallel import error, util |
@@ -35,9 +34,6 b' from IPython.kernel.zmq.session import SessionFactory' | |||
|
35 | 34 | |
|
36 | 35 | from .heartmonitor import HeartMonitor |
|
37 | 36 | |
|
38 | #----------------------------------------------------------------------------- | |
|
39 | # Code | |
|
40 | #----------------------------------------------------------------------------- | |
|
41 | 37 | |
|
42 | 38 | def _passer(*args, **kwargs): |
|
43 | 39 | return |
@@ -108,13 +104,13 b' class EngineConnector(HasTraits):' | |||
|
108 | 104 | id (int): engine ID |
|
109 | 105 | uuid (unicode): engine UUID |
|
110 | 106 | pending: set of msg_ids |
|
111 |
stallback: |
|
|
107 | stallback: tornado timeout for stalled registration | |
|
112 | 108 | """ |
|
113 | 109 | |
|
114 | 110 | id = Integer(0) |
|
115 | 111 | uuid = Unicode() |
|
116 | 112 | pending = Set() |
|
117 | stallback = Instance(ioloop.DelayedCallback) | |
|
113 | stallback = Any() | |
|
118 | 114 | |
|
119 | 115 | |
|
120 | 116 | _db_shortcuts = { |
@@ -339,13 +335,10 b' class HubFactory(RegistrationFactory):' | |||
|
339 | 335 | url = util.disambiguate_url(self.client_url('task')) |
|
340 | 336 | r.connect(url) |
|
341 | 337 | |
|
342 | # convert seconds to msec | |
|
343 | registration_timeout = 1000*self.registration_timeout | |
|
344 | ||
|
345 | 338 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
346 | 339 | query=q, notifier=n, resubmit=r, db=self.db, |
|
347 | 340 | engine_info=self.engine_info, client_info=self.client_info, |
|
348 | log=self.log, registration_timeout=registration_timeout) | |
|
341 | log=self.log, registration_timeout=self.registration_timeout) | |
|
349 | 342 | |
|
350 | 343 | |
|
351 | 344 | class Hub(SessionFactory): |
@@ -963,9 +956,11 b' class Hub(SessionFactory):' | |||
|
963 | 956 | self.finish_registration(heart) |
|
964 | 957 | else: |
|
965 | 958 | purge = lambda : self._purge_stalled_registration(heart) |
|
966 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | |
|
967 | dc.start() | |
|
968 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) | |
|
959 | t = self.loop.add_timeout( | |
|
960 | self.loop.time() + self.registration_timeout, | |
|
961 | purge, | |
|
962 | ) | |
|
963 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=t) | |
|
969 | 964 | else: |
|
970 | 965 | self.log.error("registration::registration %i failed: %r", eid, content['evalue']) |
|
971 | 966 | |
@@ -979,20 +974,15 b' class Hub(SessionFactory):' | |||
|
979 | 974 | self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True) |
|
980 | 975 | return |
|
981 | 976 | self.log.info("registration::unregister_engine(%r)", eid) |
|
982 | # print (eid) | |
|
977 | ||
|
983 | 978 | uuid = self.keytable[eid] |
|
984 | 979 | content=dict(id=eid, uuid=uuid) |
|
985 | 980 | self.dead_engines.add(uuid) |
|
986 | # self.ids.remove(eid) | |
|
987 | # uuid = self.keytable.pop(eid) | |
|
988 | # | |
|
989 | # ec = self.engines.pop(eid) | |
|
990 | # self.hearts.pop(ec.heartbeat) | |
|
991 | # self.by_ident.pop(ec.queue) | |
|
992 | # self.completed.pop(eid) | |
|
993 | handleit = lambda : self._handle_stranded_msgs(eid, uuid) | |
|
994 | dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) | |
|
995 | dc.start() | |
|
981 | ||
|
982 | self.loop.add_timeout( | |
|
983 | self.loop.time() + self.registration_timeout, | |
|
984 | lambda : self._handle_stranded_msgs(eid, uuid), | |
|
985 | ) | |
|
996 | 986 | ############## TODO: HANDLE IT ################ |
|
997 | 987 | |
|
998 | 988 | self._save_engine_state() |
@@ -1040,7 +1030,7 b' class Hub(SessionFactory):' | |||
|
1040 | 1030 | return |
|
1041 | 1031 | self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) |
|
1042 | 1032 | if ec.stallback is not None: |
|
1043 |
ec.stallback |
|
|
1033 | self.loop.remove_timeout(ec.stallback) | |
|
1044 | 1034 | eid = ec.id |
|
1045 | 1035 | self.ids.add(eid) |
|
1046 | 1036 | self.keytable[eid] = ec.uuid |
@@ -1133,8 +1123,7 b' class Hub(SessionFactory):' | |||
|
1133 | 1123 | self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) |
|
1134 | 1124 | # also notify other clients of shutdown |
|
1135 | 1125 | self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'}) |
|
1136 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) | |
|
1137 | dc.start() | |
|
1126 | self.loop.add_timeout(self.loop.time() + 1, self._shutdown) | |
|
1138 | 1127 | |
|
1139 | 1128 | def _shutdown(self): |
|
1140 | 1129 | self.log.info("hub::hub shutting down.") |
@@ -321,8 +321,9 b' class TaskScheduler(SessionFactory):' | |||
|
321 | 321 | # wait 5 seconds before cleaning up pending jobs, since the results might |
|
322 | 322 | # still be incoming |
|
323 | 323 | if self.pending[uid]: |
|
324 | dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) | |
|
325 | dc.start() | |
|
324 | self.loop.add_timeout(self.loop.time() + 5, | |
|
325 | lambda : self.handle_stranded_tasks(uid), | |
|
326 | ) | |
|
326 | 327 | else: |
|
327 | 328 | self.completed.pop(uid) |
|
328 | 329 | self.failed.pop(uid) |
@@ -25,7 +25,6 b' from IPython.parallel.controller.heartmonitor import Heart' | |||
|
25 | 25 | from IPython.parallel.factory import RegistrationFactory |
|
26 | 26 | from IPython.parallel.util import disambiguate_url |
|
27 | 27 | |
|
28 | from IPython.kernel.zmq.session import Message | |
|
29 | 28 | from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel |
|
30 | 29 | from IPython.kernel.zmq.kernelapp import IPKernelApp |
|
31 | 30 | |
@@ -155,7 +154,7 b' class EngineFactory(RegistrationFactory):' | |||
|
155 | 154 | |
|
156 | 155 | def complete_registration(self, msg, connect, maybe_tunnel): |
|
157 | 156 | # print msg |
|
158 | self._abort_dc.stop() | |
|
157 | self.loop.remove_timeout(self._abort_dc) | |
|
159 | 158 | ctx = self.context |
|
160 | 159 | loop = self.loop |
|
161 | 160 | identity = self.bident |
@@ -293,9 +292,10 b' class EngineFactory(RegistrationFactory):' | |||
|
293 | 292 | |
|
294 | 293 | |
|
295 | 294 | def start(self): |
|
296 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) | |
|
297 |
d |
|
|
298 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) | |
|
299 | self._abort_dc.start() | |
|
295 | loop = self.loop | |
|
296 | def _start(): | |
|
297 | self.register() | |
|
298 | loop.add_timeout(loop.time() + self.timeout, self.abort) | |
|
299 | self.loop.add_callback(_start) | |
|
300 | 300 | |
|
301 | 301 |
General Comments 0
You need to be logged in to leave comments.
Login now