##// END OF EJS Templates
stop using deprecated DelayedCallback...
MinRK -
Show More
@@ -17,10 +17,9 b' Authors:'
17 17 #-----------------------------------------------------------------------------
18 18
19 19 from tornado import web
20 from zmq.eventloop import ioloop
21 20
22 21 from IPython.config.configurable import LoggingConfigurable
23 from IPython.utils.traitlets import Dict, Instance, CFloat
22 from IPython.utils.traitlets import Dict, Instance, Float
24 23 from IPython.core.profileapp import list_profiles_in
25 24 from IPython.core.profiledir import ProfileDir
26 25 from IPython.utils import py3compat
@@ -38,7 +37,7 b' class ClusterManager(LoggingConfigurable):'
38 37
39 38 profiles = Dict()
40 39
41 delay = CFloat(1., config=True,
40 delay = Float(1., config=True,
42 41 help="delay (in s) between starting the controller and the engines")
43 42
44 43 loop = Instance('zmq.eventloop.ioloop.IOLoop')
@@ -133,11 +132,13 b' class ClusterManager(LoggingConfigurable):'
133 132 esl.stop()
134 133 clean_data()
135 134 cl.on_stop(controller_stopped)
136
137 dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
138 dc.start()
139 dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
140 dc.start()
135 loop = self.loop
136
137 def start():
138 """start the controller, then the engines after a delay"""
139 cl.start()
140 loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n))
141 self.loop.add_callback(start)
141 142
142 143 self.log.debug('Cluster started')
143 144 data['controller_launcher'] = cl
@@ -1,32 +1,14 b''
1 1 # encoding: utf-8
2 2 """
3 3 The Base Application class for IPython.parallel apps
4
5 Authors:
6
7 * Brian Granger
8 * Min RK
9
10 4 """
11 5
12 #-----------------------------------------------------------------------------
13 # Copyright (C) 2008-2011 The IPython Development Team
14 #
15 # Distributed under the terms of the BSD License. The full license is in
16 # the file COPYING, distributed as part of this software.
17 #-----------------------------------------------------------------------------
18
19 #-----------------------------------------------------------------------------
20 # Imports
21 #-----------------------------------------------------------------------------
22 6
23 7 import os
24 8 import logging
25 9 import re
26 10 import sys
27 11
28 from subprocess import Popen, PIPE
29
30 12 from IPython.config.application import catch_config_error, LevelFormatter
31 13 from IPython.core import release
32 14 from IPython.core.crashhandler import CrashHandler
@@ -1,27 +1,8 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 """
4 The ipcluster application.
5
6 Authors:
7
8 * Brian Granger
9 * MinRK
10
11 """
3 """The ipcluster application."""
12 4 from __future__ import print_function
13 5
14 #-----------------------------------------------------------------------------
15 # Copyright (C) 2008-2011 The IPython Development Team
16 #
17 # Distributed under the terms of the BSD License. The full license is in
18 # the file COPYING, distributed as part of this software.
19 #-----------------------------------------------------------------------------
20
21 #-----------------------------------------------------------------------------
22 # Imports
23 #-----------------------------------------------------------------------------
24
25 6 import errno
26 7 import logging
27 8 import os
@@ -30,9 +11,8 b' import signal'
30 11
31 12 from subprocess import check_call, CalledProcessError, PIPE
32 13 import zmq
33 from zmq.eventloop import ioloop
34 14
35 from IPython.config.application import Application, boolean_flag, catch_config_error
15 from IPython.config.application import catch_config_error
36 16 from IPython.config.loader import Config
37 17 from IPython.core.application import BaseIPythonApplication
38 18 from IPython.core.profiledir import ProfileDir
@@ -355,7 +335,7 b' class IPClusterEngines(BaseParallelApplication):'
355 335 raise
356 336 self.engine_launcher.on_stop(self.engines_stopped_early)
357 337 if self.early_shutdown:
358 ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
338 self.loop.add_timeout(self.loop.time() + self.early_shutdown, self.engines_started_ok)
359 339
360 340 def engines_stopped_early(self, r):
361 341 if self.early_shutdown and not self._stopping:
@@ -393,8 +373,7 b' class IPClusterEngines(BaseParallelApplication):'
393 373 self.log.error("IPython cluster: stopping")
394 374 self.stop_engines()
395 375 # Wait a few seconds to let things shut down.
396 dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
397 dc.start()
376 self.loop.add_timeout(self.loop.time() + 3, self.loop.stop)
398 377
399 378 def sigint_handler(self, signum, frame):
400 379 self.log.debug("SIGINT received, stopping launchers...")
@@ -421,9 +400,8 b' class IPClusterEngines(BaseParallelApplication):'
421 400 if self.daemonize:
422 401 if os.name=='posix':
423 402 daemonize()
424
425 dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
426 dc.start()
403
404 self.loop.add_callback(self.start_engines)
427 405 # Now write the new pid file AFTER our new forked pid is active.
428 406 # self.write_pid_file()
429 407 try:
@@ -565,11 +543,11 b' class IPClusterStart(IPClusterEngines):'
565 543 if self.daemonize:
566 544 if os.name=='posix':
567 545 daemonize()
568
569 dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
570 dc.start()
571 dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
572 dc.start()
546
547 def start():
548 self.start_controller()
549 self.loop.add_timeout(self.loop.time() + self.delay, self.start_engines)
550 self.loop.add_callback(start)
573 551 # Now write the new pid file AFTER our new forked pid is active.
574 552 self.write_pid_file()
575 553 try:
@@ -294,8 +294,7 b' class LocalProcessLauncher(BaseLauncher):'
294 294 except Exception:
295 295 self.log.debug("interrupt failed")
296 296 pass
297 self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
298 self.killer.start()
297 self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL))
299 298
300 299 # callbacks, etc:
301 300
@@ -16,7 +16,6 b' import time'
16 16 from datetime import datetime
17 17
18 18 import zmq
19 from zmq.eventloop import ioloop
20 19 from zmq.eventloop.zmqstream import ZMQStream
21 20
22 21 # internal:
@@ -25,7 +24,7 b' from IPython.utils.jsonutil import extract_dates'
25 24 from IPython.utils.localinterfaces import localhost
26 25 from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems
27 26 from IPython.utils.traitlets import (
28 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
27 HasTraits, Any, Instance, Integer, Unicode, Dict, Set, Tuple, DottedObjectName
29 28 )
30 29
31 30 from IPython.parallel import error, util
@@ -35,9 +34,6 b' from IPython.kernel.zmq.session import SessionFactory'
35 34
36 35 from .heartmonitor import HeartMonitor
37 36
38 #-----------------------------------------------------------------------------
39 # Code
40 #-----------------------------------------------------------------------------
41 37
42 38 def _passer(*args, **kwargs):
43 39 return
@@ -108,13 +104,13 b' class EngineConnector(HasTraits):'
108 104 id (int): engine ID
109 105 uuid (unicode): engine UUID
110 106 pending: set of msg_ids
111 stallback: DelayedCallback for stalled registration
107 stallback: tornado timeout for stalled registration
112 108 """
113 109
114 110 id = Integer(0)
115 111 uuid = Unicode()
116 112 pending = Set()
117 stallback = Instance(ioloop.DelayedCallback)
113 stallback = Any()
118 114
119 115
120 116 _db_shortcuts = {
@@ -339,13 +335,10 b' class HubFactory(RegistrationFactory):'
339 335 url = util.disambiguate_url(self.client_url('task'))
340 336 r.connect(url)
341 337
342 # convert seconds to msec
343 registration_timeout = 1000*self.registration_timeout
344
345 338 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
346 339 query=q, notifier=n, resubmit=r, db=self.db,
347 340 engine_info=self.engine_info, client_info=self.client_info,
348 log=self.log, registration_timeout=registration_timeout)
341 log=self.log, registration_timeout=self.registration_timeout)
349 342
350 343
351 344 class Hub(SessionFactory):
@@ -963,9 +956,11 b' class Hub(SessionFactory):'
963 956 self.finish_registration(heart)
964 957 else:
965 958 purge = lambda : self._purge_stalled_registration(heart)
966 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
967 dc.start()
968 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
959 t = self.loop.add_timeout(
960 self.loop.time() + self.registration_timeout,
961 purge,
962 )
963 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=t)
969 964 else:
970 965 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
971 966
@@ -979,20 +974,15 b' class Hub(SessionFactory):'
979 974 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
980 975 return
981 976 self.log.info("registration::unregister_engine(%r)", eid)
982 # print (eid)
977
983 978 uuid = self.keytable[eid]
984 979 content=dict(id=eid, uuid=uuid)
985 980 self.dead_engines.add(uuid)
986 # self.ids.remove(eid)
987 # uuid = self.keytable.pop(eid)
988 #
989 # ec = self.engines.pop(eid)
990 # self.hearts.pop(ec.heartbeat)
991 # self.by_ident.pop(ec.queue)
992 # self.completed.pop(eid)
993 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
994 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
995 dc.start()
981
982 self.loop.add_timeout(
983 self.loop.time() + self.registration_timeout,
984 lambda : self._handle_stranded_msgs(eid, uuid),
985 )
996 986 ############## TODO: HANDLE IT ################
997 987
998 988 self._save_engine_state()
@@ -1040,7 +1030,7 b' class Hub(SessionFactory):'
1040 1030 return
1041 1031 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1042 1032 if ec.stallback is not None:
1043 ec.stallback.stop()
1033 self.loop.remove_timeout(ec.stallback)
1044 1034 eid = ec.id
1045 1035 self.ids.add(eid)
1046 1036 self.keytable[eid] = ec.uuid
@@ -1133,8 +1123,7 b' class Hub(SessionFactory):'
1133 1123 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1134 1124 # also notify other clients of shutdown
1135 1125 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1136 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1137 dc.start()
1126 self.loop.add_timeout(self.loop.time() + 1, self._shutdown)
1138 1127
1139 1128 def _shutdown(self):
1140 1129 self.log.info("hub::hub shutting down.")
@@ -321,8 +321,9 b' class TaskScheduler(SessionFactory):'
321 321 # wait 5 seconds before cleaning up pending jobs, since the results might
322 322 # still be incoming
323 323 if self.pending[uid]:
324 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
325 dc.start()
324 self.loop.add_timeout(self.loop.time() + 5,
325 lambda : self.handle_stranded_tasks(uid),
326 )
326 327 else:
327 328 self.completed.pop(uid)
328 329 self.failed.pop(uid)
@@ -25,7 +25,6 b' from IPython.parallel.controller.heartmonitor import Heart'
25 25 from IPython.parallel.factory import RegistrationFactory
26 26 from IPython.parallel.util import disambiguate_url
27 27
28 from IPython.kernel.zmq.session import Message
29 28 from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel
30 29 from IPython.kernel.zmq.kernelapp import IPKernelApp
31 30
@@ -155,7 +154,7 b' class EngineFactory(RegistrationFactory):'
155 154
156 155 def complete_registration(self, msg, connect, maybe_tunnel):
157 156 # print msg
158 self._abort_dc.stop()
157 self.loop.remove_timeout(self._abort_dc)
159 158 ctx = self.context
160 159 loop = self.loop
161 160 identity = self.bident
@@ -293,9 +292,10 b' class EngineFactory(RegistrationFactory):'
293 292
294 293
295 294 def start(self):
296 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
297 dc.start()
298 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
299 self._abort_dc.start()
295 loop = self.loop
296 def _start():
297 self.register()
298 loop.add_timeout(loop.time() + self.timeout, self.abort)
299 self.loop.add_callback(_start)
300 300
301 301
General Comments 0
You need to be logged in to leave comments. Login now