Show More
@@ -17,10 +17,9 b' Authors:' | |||||
17 | #----------------------------------------------------------------------------- |
|
17 | #----------------------------------------------------------------------------- | |
18 |
|
18 | |||
19 | from tornado import web |
|
19 | from tornado import web | |
20 | from zmq.eventloop import ioloop |
|
|||
21 |
|
20 | |||
22 | from IPython.config.configurable import LoggingConfigurable |
|
21 | from IPython.config.configurable import LoggingConfigurable | |
23 |
from IPython.utils.traitlets import Dict, Instance, |
|
22 | from IPython.utils.traitlets import Dict, Instance, Float | |
24 | from IPython.core.profileapp import list_profiles_in |
|
23 | from IPython.core.profileapp import list_profiles_in | |
25 | from IPython.core.profiledir import ProfileDir |
|
24 | from IPython.core.profiledir import ProfileDir | |
26 | from IPython.utils import py3compat |
|
25 | from IPython.utils import py3compat | |
@@ -38,7 +37,7 b' class ClusterManager(LoggingConfigurable):' | |||||
38 |
|
37 | |||
39 | profiles = Dict() |
|
38 | profiles = Dict() | |
40 |
|
39 | |||
41 |
delay = |
|
40 | delay = Float(1., config=True, | |
42 | help="delay (in s) between starting the controller and the engines") |
|
41 | help="delay (in s) between starting the controller and the engines") | |
43 |
|
42 | |||
44 | loop = Instance('zmq.eventloop.ioloop.IOLoop') |
|
43 | loop = Instance('zmq.eventloop.ioloop.IOLoop') | |
@@ -133,11 +132,13 b' class ClusterManager(LoggingConfigurable):' | |||||
133 | esl.stop() |
|
132 | esl.stop() | |
134 | clean_data() |
|
133 | clean_data() | |
135 | cl.on_stop(controller_stopped) |
|
134 | cl.on_stop(controller_stopped) | |
|
135 | loop = self.loop | |||
136 |
|
136 | |||
137 | dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop) |
|
137 | def start(): | |
138 | dc.start() |
|
138 | """start the controller, then the engines after a delay""" | |
139 | dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop) |
|
139 | cl.start() | |
140 | dc.start() |
|
140 | loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n)) | |
|
141 | self.loop.add_callback(start) | |||
141 |
|
142 | |||
142 | self.log.debug('Cluster started') |
|
143 | self.log.debug('Cluster started') | |
143 | data['controller_launcher'] = cl |
|
144 | data['controller_launcher'] = cl |
@@ -1,32 +1,14 b'' | |||||
1 | # encoding: utf-8 |
|
1 | # encoding: utf-8 | |
2 | """ |
|
2 | """ | |
3 | The Base Application class for IPython.parallel apps |
|
3 | The Base Application class for IPython.parallel apps | |
4 |
|
||||
5 | Authors: |
|
|||
6 |
|
||||
7 | * Brian Granger |
|
|||
8 | * Min RK |
|
|||
9 |
|
||||
10 | """ |
|
4 | """ | |
11 |
|
5 | |||
12 | #----------------------------------------------------------------------------- |
|
|||
13 | # Copyright (C) 2008-2011 The IPython Development Team |
|
|||
14 | # |
|
|||
15 | # Distributed under the terms of the BSD License. The full license is in |
|
|||
16 | # the file COPYING, distributed as part of this software. |
|
|||
17 | #----------------------------------------------------------------------------- |
|
|||
18 |
|
||||
19 | #----------------------------------------------------------------------------- |
|
|||
20 | # Imports |
|
|||
21 | #----------------------------------------------------------------------------- |
|
|||
22 |
|
6 | |||
23 | import os |
|
7 | import os | |
24 | import logging |
|
8 | import logging | |
25 | import re |
|
9 | import re | |
26 | import sys |
|
10 | import sys | |
27 |
|
11 | |||
28 | from subprocess import Popen, PIPE |
|
|||
29 |
|
||||
30 | from IPython.config.application import catch_config_error, LevelFormatter |
|
12 | from IPython.config.application import catch_config_error, LevelFormatter | |
31 | from IPython.core import release |
|
13 | from IPython.core import release | |
32 | from IPython.core.crashhandler import CrashHandler |
|
14 | from IPython.core.crashhandler import CrashHandler |
@@ -1,27 +1,8 b'' | |||||
1 | #!/usr/bin/env python |
|
1 | #!/usr/bin/env python | |
2 | # encoding: utf-8 |
|
2 | # encoding: utf-8 | |
3 | """ |
|
3 | """The ipcluster application.""" | |
4 | The ipcluster application. |
|
|||
5 |
|
||||
6 | Authors: |
|
|||
7 |
|
||||
8 | * Brian Granger |
|
|||
9 | * MinRK |
|
|||
10 |
|
||||
11 | """ |
|
|||
12 | from __future__ import print_function |
|
4 | from __future__ import print_function | |
13 |
|
5 | |||
14 | #----------------------------------------------------------------------------- |
|
|||
15 | # Copyright (C) 2008-2011 The IPython Development Team |
|
|||
16 | # |
|
|||
17 | # Distributed under the terms of the BSD License. The full license is in |
|
|||
18 | # the file COPYING, distributed as part of this software. |
|
|||
19 | #----------------------------------------------------------------------------- |
|
|||
20 |
|
||||
21 | #----------------------------------------------------------------------------- |
|
|||
22 | # Imports |
|
|||
23 | #----------------------------------------------------------------------------- |
|
|||
24 |
|
||||
25 | import errno |
|
6 | import errno | |
26 | import logging |
|
7 | import logging | |
27 | import os |
|
8 | import os | |
@@ -30,9 +11,8 b' import signal' | |||||
30 |
|
11 | |||
31 | from subprocess import check_call, CalledProcessError, PIPE |
|
12 | from subprocess import check_call, CalledProcessError, PIPE | |
32 | import zmq |
|
13 | import zmq | |
33 | from zmq.eventloop import ioloop |
|
|||
34 |
|
14 | |||
35 |
from IPython.config.application import |
|
15 | from IPython.config.application import catch_config_error | |
36 | from IPython.config.loader import Config |
|
16 | from IPython.config.loader import Config | |
37 | from IPython.core.application import BaseIPythonApplication |
|
17 | from IPython.core.application import BaseIPythonApplication | |
38 | from IPython.core.profiledir import ProfileDir |
|
18 | from IPython.core.profiledir import ProfileDir | |
@@ -355,7 +335,7 b' class IPClusterEngines(BaseParallelApplication):' | |||||
355 | raise |
|
335 | raise | |
356 | self.engine_launcher.on_stop(self.engines_stopped_early) |
|
336 | self.engine_launcher.on_stop(self.engines_stopped_early) | |
357 | if self.early_shutdown: |
|
337 | if self.early_shutdown: | |
358 | ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start() |
|
338 | self.loop.add_timeout(self.loop.time() + self.early_shutdown, self.engines_started_ok) | |
359 |
|
339 | |||
360 | def engines_stopped_early(self, r): |
|
340 | def engines_stopped_early(self, r): | |
361 | if self.early_shutdown and not self._stopping: |
|
341 | if self.early_shutdown and not self._stopping: | |
@@ -393,8 +373,7 b' class IPClusterEngines(BaseParallelApplication):' | |||||
393 | self.log.error("IPython cluster: stopping") |
|
373 | self.log.error("IPython cluster: stopping") | |
394 | self.stop_engines() |
|
374 | self.stop_engines() | |
395 | # Wait a few seconds to let things shut down. |
|
375 | # Wait a few seconds to let things shut down. | |
396 | dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop) |
|
376 | self.loop.add_timeout(self.loop.time() + 3, self.loop.stop) | |
397 | dc.start() |
|
|||
398 |
|
377 | |||
399 | def sigint_handler(self, signum, frame): |
|
378 | def sigint_handler(self, signum, frame): | |
400 | self.log.debug("SIGINT received, stopping launchers...") |
|
379 | self.log.debug("SIGINT received, stopping launchers...") | |
@@ -422,8 +401,7 b' class IPClusterEngines(BaseParallelApplication):' | |||||
422 | if os.name=='posix': |
|
401 | if os.name=='posix': | |
423 | daemonize() |
|
402 | daemonize() | |
424 |
|
403 | |||
425 |
|
|
404 | self.loop.add_callback(self.start_engines) | |
426 | dc.start() |
|
|||
427 | # Now write the new pid file AFTER our new forked pid is active. |
|
405 | # Now write the new pid file AFTER our new forked pid is active. | |
428 | # self.write_pid_file() |
|
406 | # self.write_pid_file() | |
429 | try: |
|
407 | try: | |
@@ -566,10 +544,10 b' class IPClusterStart(IPClusterEngines):' | |||||
566 | if os.name=='posix': |
|
544 | if os.name=='posix': | |
567 | daemonize() |
|
545 | daemonize() | |
568 |
|
546 | |||
569 | dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop) |
|
547 | def start(): | |
570 |
|
|
548 | self.start_controller() | |
571 | dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop) |
|
549 | self.loop.add_timeout(self.loop.time() + self.delay, self.start_engines) | |
572 | dc.start() |
|
550 | self.loop.add_callback(start) | |
573 | # Now write the new pid file AFTER our new forked pid is active. |
|
551 | # Now write the new pid file AFTER our new forked pid is active. | |
574 | self.write_pid_file() |
|
552 | self.write_pid_file() | |
575 | try: |
|
553 | try: |
@@ -294,8 +294,7 b' class LocalProcessLauncher(BaseLauncher):' | |||||
294 | except Exception: |
|
294 | except Exception: | |
295 | self.log.debug("interrupt failed") |
|
295 | self.log.debug("interrupt failed") | |
296 | pass |
|
296 | pass | |
297 |
self.killer = |
|
297 | self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL)) | |
298 | self.killer.start() |
|
|||
299 |
|
298 | |||
300 | # callbacks, etc: |
|
299 | # callbacks, etc: | |
301 |
|
300 |
@@ -16,7 +16,6 b' import time' | |||||
16 | from datetime import datetime |
|
16 | from datetime import datetime | |
17 |
|
17 | |||
18 | import zmq |
|
18 | import zmq | |
19 | from zmq.eventloop import ioloop |
|
|||
20 | from zmq.eventloop.zmqstream import ZMQStream |
|
19 | from zmq.eventloop.zmqstream import ZMQStream | |
21 |
|
20 | |||
22 | # internal: |
|
21 | # internal: | |
@@ -25,7 +24,7 b' from IPython.utils.jsonutil import extract_dates' | |||||
25 | from IPython.utils.localinterfaces import localhost |
|
24 | from IPython.utils.localinterfaces import localhost | |
26 | from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems |
|
25 | from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems | |
27 | from IPython.utils.traitlets import ( |
|
26 | from IPython.utils.traitlets import ( | |
28 |
HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, |
|
27 | HasTraits, Any, Instance, Integer, Unicode, Dict, Set, Tuple, DottedObjectName | |
29 | ) |
|
28 | ) | |
30 |
|
29 | |||
31 | from IPython.parallel import error, util |
|
30 | from IPython.parallel import error, util | |
@@ -35,9 +34,6 b' from IPython.kernel.zmq.session import SessionFactory' | |||||
35 |
|
34 | |||
36 | from .heartmonitor import HeartMonitor |
|
35 | from .heartmonitor import HeartMonitor | |
37 |
|
36 | |||
38 | #----------------------------------------------------------------------------- |
|
|||
39 | # Code |
|
|||
40 | #----------------------------------------------------------------------------- |
|
|||
41 |
|
37 | |||
42 | def _passer(*args, **kwargs): |
|
38 | def _passer(*args, **kwargs): | |
43 | return |
|
39 | return | |
@@ -108,13 +104,13 b' class EngineConnector(HasTraits):' | |||||
108 | id (int): engine ID |
|
104 | id (int): engine ID | |
109 | uuid (unicode): engine UUID |
|
105 | uuid (unicode): engine UUID | |
110 | pending: set of msg_ids |
|
106 | pending: set of msg_ids | |
111 |
stallback: |
|
107 | stallback: tornado timeout for stalled registration | |
112 | """ |
|
108 | """ | |
113 |
|
109 | |||
114 | id = Integer(0) |
|
110 | id = Integer(0) | |
115 | uuid = Unicode() |
|
111 | uuid = Unicode() | |
116 | pending = Set() |
|
112 | pending = Set() | |
117 | stallback = Instance(ioloop.DelayedCallback) |
|
113 | stallback = Any() | |
118 |
|
114 | |||
119 |
|
115 | |||
120 | _db_shortcuts = { |
|
116 | _db_shortcuts = { | |
@@ -339,13 +335,10 b' class HubFactory(RegistrationFactory):' | |||||
339 | url = util.disambiguate_url(self.client_url('task')) |
|
335 | url = util.disambiguate_url(self.client_url('task')) | |
340 | r.connect(url) |
|
336 | r.connect(url) | |
341 |
|
337 | |||
342 | # convert seconds to msec |
|
|||
343 | registration_timeout = 1000*self.registration_timeout |
|
|||
344 |
|
||||
345 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, |
|
338 | self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, | |
346 | query=q, notifier=n, resubmit=r, db=self.db, |
|
339 | query=q, notifier=n, resubmit=r, db=self.db, | |
347 | engine_info=self.engine_info, client_info=self.client_info, |
|
340 | engine_info=self.engine_info, client_info=self.client_info, | |
348 | log=self.log, registration_timeout=registration_timeout) |
|
341 | log=self.log, registration_timeout=self.registration_timeout) | |
349 |
|
342 | |||
350 |
|
343 | |||
351 | class Hub(SessionFactory): |
|
344 | class Hub(SessionFactory): | |
@@ -963,9 +956,11 b' class Hub(SessionFactory):' | |||||
963 | self.finish_registration(heart) |
|
956 | self.finish_registration(heart) | |
964 | else: |
|
957 | else: | |
965 | purge = lambda : self._purge_stalled_registration(heart) |
|
958 | purge = lambda : self._purge_stalled_registration(heart) | |
966 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) |
|
959 | t = self.loop.add_timeout( | |
967 | dc.start() |
|
960 | self.loop.time() + self.registration_timeout, | |
968 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) |
|
961 | purge, | |
|
962 | ) | |||
|
963 | self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=t) | |||
969 | else: |
|
964 | else: | |
970 | self.log.error("registration::registration %i failed: %r", eid, content['evalue']) |
|
965 | self.log.error("registration::registration %i failed: %r", eid, content['evalue']) | |
971 |
|
966 | |||
@@ -979,20 +974,15 b' class Hub(SessionFactory):' | |||||
979 | self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True) |
|
974 | self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True) | |
980 | return |
|
975 | return | |
981 | self.log.info("registration::unregister_engine(%r)", eid) |
|
976 | self.log.info("registration::unregister_engine(%r)", eid) | |
982 | # print (eid) |
|
977 | ||
983 | uuid = self.keytable[eid] |
|
978 | uuid = self.keytable[eid] | |
984 | content=dict(id=eid, uuid=uuid) |
|
979 | content=dict(id=eid, uuid=uuid) | |
985 | self.dead_engines.add(uuid) |
|
980 | self.dead_engines.add(uuid) | |
986 | # self.ids.remove(eid) |
|
981 | ||
987 | # uuid = self.keytable.pop(eid) |
|
982 | self.loop.add_timeout( | |
988 | # |
|
983 | self.loop.time() + self.registration_timeout, | |
989 | # ec = self.engines.pop(eid) |
|
984 | lambda : self._handle_stranded_msgs(eid, uuid), | |
990 | # self.hearts.pop(ec.heartbeat) |
|
985 | ) | |
991 | # self.by_ident.pop(ec.queue) |
|
|||
992 | # self.completed.pop(eid) |
|
|||
993 | handleit = lambda : self._handle_stranded_msgs(eid, uuid) |
|
|||
994 | dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) |
|
|||
995 | dc.start() |
|
|||
996 | ############## TODO: HANDLE IT ################ |
|
986 | ############## TODO: HANDLE IT ################ | |
997 |
|
987 | |||
998 | self._save_engine_state() |
|
988 | self._save_engine_state() | |
@@ -1040,7 +1030,7 b' class Hub(SessionFactory):' | |||||
1040 | return |
|
1030 | return | |
1041 | self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) |
|
1031 | self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) | |
1042 | if ec.stallback is not None: |
|
1032 | if ec.stallback is not None: | |
1043 |
ec.stallback |
|
1033 | self.loop.remove_timeout(ec.stallback) | |
1044 | eid = ec.id |
|
1034 | eid = ec.id | |
1045 | self.ids.add(eid) |
|
1035 | self.ids.add(eid) | |
1046 | self.keytable[eid] = ec.uuid |
|
1036 | self.keytable[eid] = ec.uuid | |
@@ -1133,8 +1123,7 b' class Hub(SessionFactory):' | |||||
1133 | self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) |
|
1123 | self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) | |
1134 | # also notify other clients of shutdown |
|
1124 | # also notify other clients of shutdown | |
1135 | self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'}) |
|
1125 | self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'}) | |
1136 | dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) |
|
1126 | self.loop.add_timeout(self.loop.time() + 1, self._shutdown) | |
1137 | dc.start() |
|
|||
1138 |
|
1127 | |||
1139 | def _shutdown(self): |
|
1128 | def _shutdown(self): | |
1140 | self.log.info("hub::hub shutting down.") |
|
1129 | self.log.info("hub::hub shutting down.") |
@@ -321,8 +321,9 b' class TaskScheduler(SessionFactory):' | |||||
321 | # wait 5 seconds before cleaning up pending jobs, since the results might |
|
321 | # wait 5 seconds before cleaning up pending jobs, since the results might | |
322 | # still be incoming |
|
322 | # still be incoming | |
323 | if self.pending[uid]: |
|
323 | if self.pending[uid]: | |
324 | dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) |
|
324 | self.loop.add_timeout(self.loop.time() + 5, | |
325 | dc.start() |
|
325 | lambda : self.handle_stranded_tasks(uid), | |
|
326 | ) | |||
326 | else: |
|
327 | else: | |
327 | self.completed.pop(uid) |
|
328 | self.completed.pop(uid) | |
328 | self.failed.pop(uid) |
|
329 | self.failed.pop(uid) |
@@ -25,7 +25,6 b' from IPython.parallel.controller.heartmonitor import Heart' | |||||
25 | from IPython.parallel.factory import RegistrationFactory |
|
25 | from IPython.parallel.factory import RegistrationFactory | |
26 | from IPython.parallel.util import disambiguate_url |
|
26 | from IPython.parallel.util import disambiguate_url | |
27 |
|
27 | |||
28 | from IPython.kernel.zmq.session import Message |
|
|||
29 | from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel |
|
28 | from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel | |
30 | from IPython.kernel.zmq.kernelapp import IPKernelApp |
|
29 | from IPython.kernel.zmq.kernelapp import IPKernelApp | |
31 |
|
30 | |||
@@ -155,7 +154,7 b' class EngineFactory(RegistrationFactory):' | |||||
155 |
|
154 | |||
156 | def complete_registration(self, msg, connect, maybe_tunnel): |
|
155 | def complete_registration(self, msg, connect, maybe_tunnel): | |
157 | # print msg |
|
156 | # print msg | |
158 | self._abort_dc.stop() |
|
157 | self.loop.remove_timeout(self._abort_dc) | |
159 | ctx = self.context |
|
158 | ctx = self.context | |
160 | loop = self.loop |
|
159 | loop = self.loop | |
161 | identity = self.bident |
|
160 | identity = self.bident | |
@@ -293,9 +292,10 b' class EngineFactory(RegistrationFactory):' | |||||
293 |
|
292 | |||
294 |
|
293 | |||
295 | def start(self): |
|
294 | def start(self): | |
296 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) |
|
295 | loop = self.loop | |
297 |
d |
|
296 | def _start(): | |
298 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) |
|
297 | self.register() | |
299 | self._abort_dc.start() |
|
298 | loop.add_timeout(loop.time() + self.timeout, self.abort) | |
|
299 | self.loop.add_callback(_start) | |||
300 |
|
300 | |||
301 |
|
301 |
General Comments 0
You need to be logged in to leave comments.
Login now