##// END OF EJS Templates
stop using deprecated DelayedCallback...
MinRK -
Show More
@@ -17,10 +17,9 b' Authors:'
17 #-----------------------------------------------------------------------------
17 #-----------------------------------------------------------------------------
18
18
19 from tornado import web
19 from tornado import web
20 from zmq.eventloop import ioloop
21
20
22 from IPython.config.configurable import LoggingConfigurable
21 from IPython.config.configurable import LoggingConfigurable
23 from IPython.utils.traitlets import Dict, Instance, CFloat
22 from IPython.utils.traitlets import Dict, Instance, Float
24 from IPython.core.profileapp import list_profiles_in
23 from IPython.core.profileapp import list_profiles_in
25 from IPython.core.profiledir import ProfileDir
24 from IPython.core.profiledir import ProfileDir
26 from IPython.utils import py3compat
25 from IPython.utils import py3compat
@@ -38,7 +37,7 b' class ClusterManager(LoggingConfigurable):'
38
37
39 profiles = Dict()
38 profiles = Dict()
40
39
41 delay = CFloat(1., config=True,
40 delay = Float(1., config=True,
42 help="delay (in s) between starting the controller and the engines")
41 help="delay (in s) between starting the controller and the engines")
43
42
44 loop = Instance('zmq.eventloop.ioloop.IOLoop')
43 loop = Instance('zmq.eventloop.ioloop.IOLoop')
@@ -133,11 +132,13 b' class ClusterManager(LoggingConfigurable):'
133 esl.stop()
132 esl.stop()
134 clean_data()
133 clean_data()
135 cl.on_stop(controller_stopped)
134 cl.on_stop(controller_stopped)
135 loop = self.loop
136
136
137 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
137 def start():
138 dc.start()
138 """start the controller, then the engines after a delay"""
139 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
139 cl.start()
140 dc.start()
140 loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n))
141 self.loop.add_callback(start)
141
142
142 self.log.debug('Cluster started')
143 self.log.debug('Cluster started')
143 data['controller_launcher'] = cl
144 data['controller_launcher'] = cl
@@ -1,32 +1,14 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 """
2 """
3 The Base Application class for IPython.parallel apps
3 The Base Application class for IPython.parallel apps
4
5 Authors:
6
7 * Brian Granger
8 * Min RK
9
10 """
4 """
11
5
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
18
19 #-----------------------------------------------------------------------------
20 # Imports
21 #-----------------------------------------------------------------------------
22
6
23 import os
7 import os
24 import logging
8 import logging
25 import re
9 import re
26 import sys
10 import sys
27
11
28 from subprocess import Popen, PIPE
29
30 from IPython.config.application import catch_config_error, LevelFormatter
12 from IPython.config.application import catch_config_error, LevelFormatter
31 from IPython.core import release
13 from IPython.core import release
32 from IPython.core.crashhandler import CrashHandler
14 from IPython.core.crashhandler import CrashHandler
@@ -1,27 +1,8 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """The ipcluster application."""
4 The ipcluster application.
5
6 Authors:
7
8 * Brian Granger
9 * MinRK
10
11 """
12 from __future__ import print_function
4 from __future__ import print_function
13
5
14 #-----------------------------------------------------------------------------
15 # Copyright (C) 2008-2011 The IPython Development Team
16 #
17 # Distributed under the terms of the BSD License. The full license is in
18 # the file COPYING, distributed as part of this software.
19 #-----------------------------------------------------------------------------
20
21 #-----------------------------------------------------------------------------
22 # Imports
23 #-----------------------------------------------------------------------------
24
25 import errno
6 import errno
26 import logging
7 import logging
27 import os
8 import os
@@ -30,9 +11,8 b' import signal'
30
11
31 from subprocess import check_call, CalledProcessError, PIPE
12 from subprocess import check_call, CalledProcessError, PIPE
32 import zmq
13 import zmq
33 from zmq.eventloop import ioloop
34
14
35 from IPython.config.application import Application, boolean_flag, catch_config_error
15 from IPython.config.application import catch_config_error
36 from IPython.config.loader import Config
16 from IPython.config.loader import Config
37 from IPython.core.application import BaseIPythonApplication
17 from IPython.core.application import BaseIPythonApplication
38 from IPython.core.profiledir import ProfileDir
18 from IPython.core.profiledir import ProfileDir
@@ -355,7 +335,7 b' class IPClusterEngines(BaseParallelApplication):'
355 raise
335 raise
356 self.engine_launcher.on_stop(self.engines_stopped_early)
336 self.engine_launcher.on_stop(self.engines_stopped_early)
357 if self.early_shutdown:
337 if self.early_shutdown:
358 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
338 self.loop.add_timeout(self.loop.time() + self.early_shutdown, self.engines_started_ok)
359
339
360 def engines_stopped_early(self, r):
340 def engines_stopped_early(self, r):
361 if self.early_shutdown and not self._stopping:
341 if self.early_shutdown and not self._stopping:
@@ -393,8 +373,7 b' class IPClusterEngines(BaseParallelApplication):'
393 self.log.error("IPython cluster: stopping")
373 self.log.error("IPython cluster: stopping")
394 self.stop_engines()
374 self.stop_engines()
395 # Wait a few seconds to let things shut down.
375 # Wait a few seconds to let things shut down.
396 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
376 self.loop.add_timeout(self.loop.time() + 3, self.loop.stop)
397 dc.start()
398
377
399 def sigint_handler(self, signum, frame):
378 def sigint_handler(self, signum, frame):
400 self.log.debug("SIGINT received, stopping launchers...")
379 self.log.debug("SIGINT received, stopping launchers...")
@@ -422,8 +401,7 b' class IPClusterEngines(BaseParallelApplication):'
422 if os.name=='posix':
401 if os.name=='posix':
423 daemonize()
402 daemonize()
424
403
425 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
404 self.loop.add_callback(self.start_engines)
426 dc.start()
427 # Now write the new pid file AFTER our new forked pid is active.
405 # Now write the new pid file AFTER our new forked pid is active.
428 # self.write_pid_file()
406 # self.write_pid_file()
429 try:
407 try:
@@ -566,10 +544,10 b' class IPClusterStart(IPClusterEngines):'
566 if os.name=='posix':
544 if os.name=='posix':
567 daemonize()
545 daemonize()
568
546
569 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
547 def start():
570 dc.start()
548 self.start_controller()
571 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
549 self.loop.add_timeout(self.loop.time() + self.delay, self.start_engines)
572 dc.start()
550 self.loop.add_callback(start)
573 # Now write the new pid file AFTER our new forked pid is active.
551 # Now write the new pid file AFTER our new forked pid is active.
574 self.write_pid_file()
552 self.write_pid_file()
575 try:
553 try:
@@ -294,8 +294,7 b' class LocalProcessLauncher(BaseLauncher):'
294 except Exception:
294 except Exception:
295 self.log.debug("interrupt failed")
295 self.log.debug("interrupt failed")
296 pass
296 pass
297 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
297 self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL))
298 self.killer.start()
299
298
300 # callbacks, etc:
299 # callbacks, etc:
301
300
@@ -16,7 +16,6 b' import time'
16 from datetime import datetime
16 from datetime import datetime
17
17
18 import zmq
18 import zmq
19 from zmq.eventloop import ioloop
20 from zmq.eventloop.zmqstream import ZMQStream
19 from zmq.eventloop.zmqstream import ZMQStream
21
20
22 # internal:
21 # internal:
@@ -25,7 +24,7 b' from IPython.utils.jsonutil import extract_dates'
25 from IPython.utils.localinterfaces import localhost
24 from IPython.utils.localinterfaces import localhost
26 from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems
25 from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems
27 from IPython.utils.traitlets import (
26 from IPython.utils.traitlets import (
28 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
27 HasTraits, Any, Instance, Integer, Unicode, Dict, Set, Tuple, DottedObjectName
29 )
28 )
30
29
31 from IPython.parallel import error, util
30 from IPython.parallel import error, util
@@ -35,9 +34,6 b' from IPython.kernel.zmq.session import SessionFactory'
35
34
36 from .heartmonitor import HeartMonitor
35 from .heartmonitor import HeartMonitor
37
36
38 #-----------------------------------------------------------------------------
39 # Code
40 #-----------------------------------------------------------------------------
41
37
42 def _passer(*args, **kwargs):
38 def _passer(*args, **kwargs):
43 return
39 return
@@ -108,13 +104,13 b' class EngineConnector(HasTraits):'
108 id (int): engine ID
104 id (int): engine ID
109 uuid (unicode): engine UUID
105 uuid (unicode): engine UUID
110 pending: set of msg_ids
106 pending: set of msg_ids
111 stallback: DelayedCallback for stalled registration
107 stallback: tornado timeout for stalled registration
112 """
108 """
113
109
114 id = Integer(0)
110 id = Integer(0)
115 uuid = Unicode()
111 uuid = Unicode()
116 pending = Set()
112 pending = Set()
117 stallback = Instance(ioloop.DelayedCallback)
113 stallback = Any()
118
114
119
115
120 _db_shortcuts = {
116 _db_shortcuts = {
@@ -339,13 +335,10 b' class HubFactory(RegistrationFactory):'
339 url = util.disambiguate_url(self.client_url('task'))
335 url = util.disambiguate_url(self.client_url('task'))
340 r.connect(url)
336 r.connect(url)
341
337
342 # convert seconds to msec
343 registration_timeout = 1000*self.registration_timeout
344
345 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
338 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
346 query=q, notifier=n, resubmit=r, db=self.db,
339 query=q, notifier=n, resubmit=r, db=self.db,
347 engine_info=self.engine_info, client_info=self.client_info,
340 engine_info=self.engine_info, client_info=self.client_info,
348 log=self.log, registration_timeout=registration_timeout)
341 log=self.log, registration_timeout=self.registration_timeout)
349
342
350
343
351 class Hub(SessionFactory):
344 class Hub(SessionFactory):
@@ -963,9 +956,11 b' class Hub(SessionFactory):'
963 self.finish_registration(heart)
956 self.finish_registration(heart)
964 else:
957 else:
965 purge = lambda : self._purge_stalled_registration(heart)
958 purge = lambda : self._purge_stalled_registration(heart)
966 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
959 t = self.loop.add_timeout(
967 dc.start()
960 self.loop.time() + self.registration_timeout,
968 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
961 purge,
962 )
963 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=t)
969 else:
964 else:
970 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
965 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
971
966
@@ -979,20 +974,15 b' class Hub(SessionFactory):'
979 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
974 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
980 return
975 return
981 self.log.info("registration::unregister_engine(%r)", eid)
976 self.log.info("registration::unregister_engine(%r)", eid)
982 # print (eid)
977
983 uuid = self.keytable[eid]
978 uuid = self.keytable[eid]
984 content=dict(id=eid, uuid=uuid)
979 content=dict(id=eid, uuid=uuid)
985 self.dead_engines.add(uuid)
980 self.dead_engines.add(uuid)
986 # self.ids.remove(eid)
981
987 # uuid = self.keytable.pop(eid)
982 self.loop.add_timeout(
988 #
983 self.loop.time() + self.registration_timeout,
989 # ec = self.engines.pop(eid)
984 lambda : self._handle_stranded_msgs(eid, uuid),
990 # self.hearts.pop(ec.heartbeat)
985 )
991 # self.by_ident.pop(ec.queue)
992 # self.completed.pop(eid)
993 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
994 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
995 dc.start()
996 ############## TODO: HANDLE IT ################
986 ############## TODO: HANDLE IT ################
997
987
998 self._save_engine_state()
988 self._save_engine_state()
@@ -1040,7 +1030,7 b' class Hub(SessionFactory):'
1040 return
1030 return
1041 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1031 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1042 if ec.stallback is not None:
1032 if ec.stallback is not None:
1043 ec.stallback.stop()
1033 self.loop.remove_timeout(ec.stallback)
1044 eid = ec.id
1034 eid = ec.id
1045 self.ids.add(eid)
1035 self.ids.add(eid)
1046 self.keytable[eid] = ec.uuid
1036 self.keytable[eid] = ec.uuid
@@ -1133,8 +1123,7 b' class Hub(SessionFactory):'
1133 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1123 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1134 # also notify other clients of shutdown
1124 # also notify other clients of shutdown
1135 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1125 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1136 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1126 self.loop.add_timeout(self.loop.time() + 1, self._shutdown)
1137 dc.start()
1138
1127
1139 def _shutdown(self):
1128 def _shutdown(self):
1140 self.log.info("hub::hub shutting down.")
1129 self.log.info("hub::hub shutting down.")
@@ -321,8 +321,9 b' class TaskScheduler(SessionFactory):'
321 # wait 5 seconds before cleaning up pending jobs, since the results might
321 # wait 5 seconds before cleaning up pending jobs, since the results might
322 # still be incoming
322 # still be incoming
323 if self.pending[uid]:
323 if self.pending[uid]:
324 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
324 self.loop.add_timeout(self.loop.time() + 5,
325 dc.start()
325 lambda : self.handle_stranded_tasks(uid),
326 )
326 else:
327 else:
327 self.completed.pop(uid)
328 self.completed.pop(uid)
328 self.failed.pop(uid)
329 self.failed.pop(uid)
@@ -25,7 +25,6 b' from IPython.parallel.controller.heartmonitor import Heart'
25 from IPython.parallel.factory import RegistrationFactory
25 from IPython.parallel.factory import RegistrationFactory
26 from IPython.parallel.util import disambiguate_url
26 from IPython.parallel.util import disambiguate_url
27
27
28 from IPython.kernel.zmq.session import Message
29 from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel
28 from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel
30 from IPython.kernel.zmq.kernelapp import IPKernelApp
29 from IPython.kernel.zmq.kernelapp import IPKernelApp
31
30
@@ -155,7 +154,7 b' class EngineFactory(RegistrationFactory):'
155
154
156 def complete_registration(self, msg, connect, maybe_tunnel):
155 def complete_registration(self, msg, connect, maybe_tunnel):
157 # print msg
156 # print msg
158 self._abort_dc.stop()
157 self.loop.remove_timeout(self._abort_dc)
159 ctx = self.context
158 ctx = self.context
160 loop = self.loop
159 loop = self.loop
161 identity = self.bident
160 identity = self.bident
@@ -293,9 +292,10 b' class EngineFactory(RegistrationFactory):'
293
292
294
293
295 def start(self):
294 def start(self):
296 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
295 loop = self.loop
297 dc.start()
296 def _start():
298 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
297 self.register()
299 self._abort_dc.start()
298 loop.add_timeout(loop.time() + self.timeout, self.abort)
299 self.loop.add_callback(_start)
300
300
301
301
General Comments 0
You need to be logged in to leave comments. Login now