diff --git a/IPython/html/services/clusters/clustermanager.py b/IPython/html/services/clusters/clustermanager.py index fecde70..b159fc2 100644 --- a/IPython/html/services/clusters/clustermanager.py +++ b/IPython/html/services/clusters/clustermanager.py @@ -17,10 +17,9 @@ Authors: #----------------------------------------------------------------------------- from tornado import web -from zmq.eventloop import ioloop from IPython.config.configurable import LoggingConfigurable -from IPython.utils.traitlets import Dict, Instance, CFloat +from IPython.utils.traitlets import Dict, Instance, Float from IPython.core.profileapp import list_profiles_in from IPython.core.profiledir import ProfileDir from IPython.utils import py3compat @@ -38,7 +37,7 @@ class ClusterManager(LoggingConfigurable): profiles = Dict() - delay = CFloat(1., config=True, + delay = Float(1., config=True, help="delay (in s) between starting the controller and the engines") loop = Instance('zmq.eventloop.ioloop.IOLoop') @@ -133,11 +132,13 @@ class ClusterManager(LoggingConfigurable): esl.stop() clean_data() cl.on_stop(controller_stopped) - - dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop) - dc.start() - dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop) - dc.start() + loop = self.loop + + def start(): + """start the controller, then the engines after a delay""" + cl.start() + loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n)) + self.loop.add_callback(start) self.log.debug('Cluster started') data['controller_launcher'] = cl diff --git a/IPython/parallel/apps/baseapp.py b/IPython/parallel/apps/baseapp.py index fc06304..e538c04 100644 --- a/IPython/parallel/apps/baseapp.py +++ b/IPython/parallel/apps/baseapp.py @@ -1,32 +1,14 @@ # encoding: utf-8 """ The Base Application class for IPython.parallel apps - -Authors: - -* Brian Granger -* Min RK - """ -#----------------------------------------------------------------------------- -# Copyright (C) 2008-2011 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- - -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- import os import logging import re import sys -from subprocess import Popen, PIPE - from IPython.config.application import catch_config_error, LevelFormatter from IPython.core import release from IPython.core.crashhandler import CrashHandler diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py index 8777bad..f476f42 100755 --- a/IPython/parallel/apps/ipclusterapp.py +++ b/IPython/parallel/apps/ipclusterapp.py @@ -1,27 +1,8 @@ #!/usr/bin/env python # encoding: utf-8 -""" -The ipcluster application. - -Authors: - -* Brian Granger -* MinRK - -""" +"""The ipcluster application.""" from __future__ import print_function -#----------------------------------------------------------------------------- -# Copyright (C) 2008-2011 The IPython Development Team -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#----------------------------------------------------------------------------- - -#----------------------------------------------------------------------------- -# Imports -#----------------------------------------------------------------------------- - import errno import logging import os @@ -30,9 +11,8 @@ import signal from subprocess import check_call, CalledProcessError, PIPE import zmq -from zmq.eventloop import ioloop -from IPython.config.application import Application, boolean_flag, catch_config_error +from IPython.config.application import catch_config_error from IPython.config.loader import Config from IPython.core.application import BaseIPythonApplication from IPython.core.profiledir import ProfileDir @@ -355,7 +335,7 @@ class IPClusterEngines(BaseParallelApplication): raise self.engine_launcher.on_stop(self.engines_stopped_early) if self.early_shutdown: - ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start() + self.loop.add_timeout(self.loop.time() + self.early_shutdown, self.engines_started_ok) def engines_stopped_early(self, r): if self.early_shutdown and not self._stopping: @@ -393,8 +373,7 @@ class IPClusterEngines(BaseParallelApplication): self.log.error("IPython cluster: stopping") self.stop_engines() # Wait a few seconds to let things shut down. - dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop) - dc.start() + self.loop.add_timeout(self.loop.time() + 3, self.loop.stop) def sigint_handler(self, signum, frame): self.log.debug("SIGINT received, stopping launchers...") @@ -421,9 +400,8 @@ class IPClusterEngines(BaseParallelApplication): if self.daemonize: if os.name=='posix': daemonize() - - dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop) - dc.start() + + self.loop.add_callback(self.start_engines) # Now write the new pid file AFTER our new forked pid is active. # self.write_pid_file() try: @@ -565,11 +543,11 @@ class IPClusterStart(IPClusterEngines): if self.daemonize: if os.name=='posix': daemonize() - - dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop) - dc.start() - dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop) - dc.start() + + def start(): + self.start_controller() + self.loop.add_timeout(self.loop.time() + self.delay, self.start_engines) + self.loop.add_callback(start) # Now write the new pid file AFTER our new forked pid is active. self.write_pid_file() try: diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py index 8479c52..a36accc 100644 --- a/IPython/parallel/apps/launcher.py +++ b/IPython/parallel/apps/launcher.py @@ -294,8 +294,7 @@ class LocalProcessLauncher(BaseLauncher): except Exception: self.log.debug("interrupt failed") pass - self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop) - self.killer.start() + self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL)) # callbacks, etc: diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 40e55cc..d4f162a 100644 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -16,7 +16,6 @@ import time from datetime import datetime import zmq -from zmq.eventloop import ioloop from zmq.eventloop.zmqstream import ZMQStream # internal: @@ -25,7 +24,7 @@ from IPython.utils.jsonutil import extract_dates from IPython.utils.localinterfaces import localhost from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems from IPython.utils.traitlets import ( - HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName + HasTraits, Any, Instance, Integer, Unicode, Dict, Set, Tuple, DottedObjectName ) from IPython.parallel import error, util @@ -35,9 +34,6 @@ from IPython.kernel.zmq.session import SessionFactory from .heartmonitor import HeartMonitor -#----------------------------------------------------------------------------- -# Code -#----------------------------------------------------------------------------- def _passer(*args, **kwargs): return @@ -108,13 +104,13 @@ class EngineConnector(HasTraits): id (int): engine ID uuid (unicode): engine UUID pending: set of msg_ids - stallback: DelayedCallback for stalled registration + stallback: tornado timeout for stalled registration """ id = Integer(0) uuid = Unicode() pending = Set() - stallback = Instance(ioloop.DelayedCallback) + stallback = Any() _db_shortcuts = { @@ -339,13 +335,10 @@ class HubFactory(RegistrationFactory): url = util.disambiguate_url(self.client_url('task')) r.connect(url) - # convert seconds to msec - registration_timeout = 1000*self.registration_timeout - self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor, query=q, notifier=n, resubmit=r, db=self.db, engine_info=self.engine_info, client_info=self.client_info, - log=self.log, registration_timeout=registration_timeout) + log=self.log, registration_timeout=self.registration_timeout) class Hub(SessionFactory): @@ -963,9 +956,11 @@ class Hub(SessionFactory): self.finish_registration(heart) else: purge = lambda : self._purge_stalled_registration(heart) - dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) - dc.start() - self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc) + t = self.loop.add_timeout( + self.loop.time() + self.registration_timeout, + purge, + ) + self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=t) else: self.log.error("registration::registration %i failed: %r", eid, content['evalue']) @@ -979,20 +974,15 @@ class Hub(SessionFactory): self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True) return self.log.info("registration::unregister_engine(%r)", eid) - # print (eid) + uuid = self.keytable[eid] content=dict(id=eid, uuid=uuid) self.dead_engines.add(uuid) - # self.ids.remove(eid) - # uuid = self.keytable.pop(eid) - # - # ec = self.engines.pop(eid) - # self.hearts.pop(ec.heartbeat) - # self.by_ident.pop(ec.queue) - # self.completed.pop(eid) - handleit = lambda : self._handle_stranded_msgs(eid, uuid) - dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop) - dc.start() + + self.loop.add_timeout( + self.loop.time() + self.registration_timeout, + lambda : self._handle_stranded_msgs(eid, uuid), + ) ############## TODO: HANDLE IT ################ self._save_engine_state() @@ -1040,7 +1030,7 @@ class Hub(SessionFactory): return self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid) if ec.stallback is not None: - ec.stallback.stop() + self.loop.remove_timeout(ec.stallback) eid = ec.id self.ids.add(eid) self.keytable[eid] = ec.uuid @@ -1133,8 +1123,7 @@ class Hub(SessionFactory): self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id) # also notify other clients of shutdown self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'}) - dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop) - dc.start() + self.loop.add_timeout(self.loop.time() + 1, self._shutdown) def _shutdown(self): self.log.info("hub::hub shutting down.") diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 1894b93..fbf07c4 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -321,8 +321,9 @@ class TaskScheduler(SessionFactory): # wait 5 seconds before cleaning up pending jobs, since the results might # still be incoming if self.pending[uid]: - dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) - dc.start() + self.loop.add_timeout(self.loop.time() + 5, + lambda : self.handle_stranded_tasks(uid), + ) else: self.completed.pop(uid) self.failed.pop(uid) diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 326a690..b31b16d 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -25,7 +25,6 @@ from IPython.parallel.controller.heartmonitor import Heart from IPython.parallel.factory import RegistrationFactory from IPython.parallel.util import disambiguate_url -from IPython.kernel.zmq.session import Message from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel from IPython.kernel.zmq.kernelapp import IPKernelApp @@ -155,7 +154,7 @@ class EngineFactory(RegistrationFactory): def complete_registration(self, msg, connect, maybe_tunnel): # print msg - self._abort_dc.stop() + self.loop.remove_timeout(self._abort_dc) ctx = self.context loop = self.loop identity = self.bident @@ -293,9 +292,10 @@ class EngineFactory(RegistrationFactory): def start(self): - dc = ioloop.DelayedCallback(self.register, 0, self.loop) - dc.start() - self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) - self._abort_dc.start() + loop = self.loop + def _start(): + self.register() + loop.add_timeout(loop.time() + self.timeout, self.abort) + self.loop.add_callback(_start)