diff --git a/IPython/html/services/clusters/clustermanager.py b/IPython/html/services/clusters/clustermanager.py
index fecde70..b159fc2 100644
--- a/IPython/html/services/clusters/clustermanager.py
+++ b/IPython/html/services/clusters/clustermanager.py
@@ -17,10 +17,9 @@ Authors:
#-----------------------------------------------------------------------------
from tornado import web
-from zmq.eventloop import ioloop
from IPython.config.configurable import LoggingConfigurable
-from IPython.utils.traitlets import Dict, Instance, CFloat
+from IPython.utils.traitlets import Dict, Instance, Float
from IPython.core.profileapp import list_profiles_in
from IPython.core.profiledir import ProfileDir
from IPython.utils import py3compat
@@ -38,7 +37,7 @@ class ClusterManager(LoggingConfigurable):
profiles = Dict()
- delay = CFloat(1., config=True,
+ delay = Float(1., config=True,
help="delay (in s) between starting the controller and the engines")
loop = Instance('zmq.eventloop.ioloop.IOLoop')
@@ -133,11 +132,13 @@ class ClusterManager(LoggingConfigurable):
esl.stop()
clean_data()
cl.on_stop(controller_stopped)
-
- dc = ioloop.DelayedCallback(lambda: cl.start(), 0, self.loop)
- dc.start()
- dc = ioloop.DelayedCallback(lambda: esl.start(n), 1000*self.delay, self.loop)
- dc.start()
+ loop = self.loop
+
+ def start():
+ """start the controller, then the engines after a delay"""
+ cl.start()
+ loop.add_timeout(self.loop.time() + self.delay, lambda : esl.start(n))
+ self.loop.add_callback(start)
self.log.debug('Cluster started')
data['controller_launcher'] = cl
diff --git a/IPython/parallel/apps/baseapp.py b/IPython/parallel/apps/baseapp.py
index fc06304..e538c04 100644
--- a/IPython/parallel/apps/baseapp.py
+++ b/IPython/parallel/apps/baseapp.py
@@ -1,32 +1,14 @@
# encoding: utf-8
"""
The Base Application class for IPython.parallel apps
-
-Authors:
-
-* Brian Granger
-* Min RK
-
"""
-#-----------------------------------------------------------------------------
-# Copyright (C) 2008-2011 The IPython Development Team
-#
-# Distributed under the terms of the BSD License. The full license is in
-# the file COPYING, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-#-----------------------------------------------------------------------------
-# Imports
-#-----------------------------------------------------------------------------
import os
import logging
import re
import sys
-from subprocess import Popen, PIPE
-
from IPython.config.application import catch_config_error, LevelFormatter
from IPython.core import release
from IPython.core.crashhandler import CrashHandler
diff --git a/IPython/parallel/apps/ipclusterapp.py b/IPython/parallel/apps/ipclusterapp.py
index 8777bad..f476f42 100755
--- a/IPython/parallel/apps/ipclusterapp.py
+++ b/IPython/parallel/apps/ipclusterapp.py
@@ -1,27 +1,8 @@
#!/usr/bin/env python
# encoding: utf-8
-"""
-The ipcluster application.
-
-Authors:
-
-* Brian Granger
-* MinRK
-
-"""
+"""The ipcluster application."""
from __future__ import print_function
-#-----------------------------------------------------------------------------
-# Copyright (C) 2008-2011 The IPython Development Team
-#
-# Distributed under the terms of the BSD License. The full license is in
-# the file COPYING, distributed as part of this software.
-#-----------------------------------------------------------------------------
-
-#-----------------------------------------------------------------------------
-# Imports
-#-----------------------------------------------------------------------------
-
import errno
import logging
import os
@@ -30,9 +11,8 @@ import signal
from subprocess import check_call, CalledProcessError, PIPE
import zmq
-from zmq.eventloop import ioloop
-from IPython.config.application import Application, boolean_flag, catch_config_error
+from IPython.config.application import catch_config_error
from IPython.config.loader import Config
from IPython.core.application import BaseIPythonApplication
from IPython.core.profiledir import ProfileDir
@@ -355,7 +335,7 @@ class IPClusterEngines(BaseParallelApplication):
raise
self.engine_launcher.on_stop(self.engines_stopped_early)
if self.early_shutdown:
- ioloop.DelayedCallback(self.engines_started_ok, self.early_shutdown*1000, self.loop).start()
+ self.loop.add_timeout(self.loop.time() + self.early_shutdown, self.engines_started_ok)
def engines_stopped_early(self, r):
if self.early_shutdown and not self._stopping:
@@ -393,8 +373,7 @@ class IPClusterEngines(BaseParallelApplication):
self.log.error("IPython cluster: stopping")
self.stop_engines()
# Wait a few seconds to let things shut down.
- dc = ioloop.DelayedCallback(self.loop.stop, 3000, self.loop)
- dc.start()
+ self.loop.add_timeout(self.loop.time() + 3, self.loop.stop)
def sigint_handler(self, signum, frame):
self.log.debug("SIGINT received, stopping launchers...")
@@ -421,9 +400,8 @@ class IPClusterEngines(BaseParallelApplication):
if self.daemonize:
if os.name=='posix':
daemonize()
-
- dc = ioloop.DelayedCallback(self.start_engines, 0, self.loop)
- dc.start()
+
+ self.loop.add_callback(self.start_engines)
# Now write the new pid file AFTER our new forked pid is active.
# self.write_pid_file()
try:
@@ -565,11 +543,11 @@ class IPClusterStart(IPClusterEngines):
if self.daemonize:
if os.name=='posix':
daemonize()
-
- dc = ioloop.DelayedCallback(self.start_controller, 0, self.loop)
- dc.start()
- dc = ioloop.DelayedCallback(self.start_engines, 1000*self.delay, self.loop)
- dc.start()
+
+ def start():
+ self.start_controller()
+ self.loop.add_timeout(self.loop.time() + self.delay, self.start_engines)
+ self.loop.add_callback(start)
# Now write the new pid file AFTER our new forked pid is active.
self.write_pid_file()
try:
diff --git a/IPython/parallel/apps/launcher.py b/IPython/parallel/apps/launcher.py
index 8479c52..a36accc 100644
--- a/IPython/parallel/apps/launcher.py
+++ b/IPython/parallel/apps/launcher.py
@@ -294,8 +294,7 @@ class LocalProcessLauncher(BaseLauncher):
except Exception:
self.log.debug("interrupt failed")
pass
- self.killer = ioloop.DelayedCallback(lambda : self.signal(SIGKILL), delay*1000, self.loop)
- self.killer.start()
+ self.killer = self.loop.add_timeout(self.loop.time() + delay, lambda : self.signal(SIGKILL))
# callbacks, etc:
diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py
index 40e55cc..d4f162a 100644
--- a/IPython/parallel/controller/hub.py
+++ b/IPython/parallel/controller/hub.py
@@ -16,7 +16,6 @@ import time
from datetime import datetime
import zmq
-from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
# internal:
@@ -25,7 +24,7 @@ from IPython.utils.jsonutil import extract_dates
from IPython.utils.localinterfaces import localhost
from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems
from IPython.utils.traitlets import (
- HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
+ HasTraits, Any, Instance, Integer, Unicode, Dict, Set, Tuple, DottedObjectName
)
from IPython.parallel import error, util
@@ -35,9 +34,6 @@ from IPython.kernel.zmq.session import SessionFactory
from .heartmonitor import HeartMonitor
-#-----------------------------------------------------------------------------
-# Code
-#-----------------------------------------------------------------------------
def _passer(*args, **kwargs):
return
@@ -108,13 +104,13 @@ class EngineConnector(HasTraits):
id (int): engine ID
uuid (unicode): engine UUID
pending: set of msg_ids
- stallback: DelayedCallback for stalled registration
+ stallback: tornado timeout for stalled registration
"""
id = Integer(0)
uuid = Unicode()
pending = Set()
- stallback = Instance(ioloop.DelayedCallback)
+ stallback = Any()
_db_shortcuts = {
@@ -339,13 +335,10 @@ class HubFactory(RegistrationFactory):
url = util.disambiguate_url(self.client_url('task'))
r.connect(url)
- # convert seconds to msec
- registration_timeout = 1000*self.registration_timeout
-
self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
query=q, notifier=n, resubmit=r, db=self.db,
engine_info=self.engine_info, client_info=self.client_info,
- log=self.log, registration_timeout=registration_timeout)
+ log=self.log, registration_timeout=self.registration_timeout)
class Hub(SessionFactory):
@@ -963,9 +956,11 @@ class Hub(SessionFactory):
self.finish_registration(heart)
else:
purge = lambda : self._purge_stalled_registration(heart)
- dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
- dc.start()
- self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
+ t = self.loop.add_timeout(
+ self.loop.time() + self.registration_timeout,
+ purge,
+ )
+ self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=t)
else:
self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
@@ -979,20 +974,15 @@ class Hub(SessionFactory):
self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
return
self.log.info("registration::unregister_engine(%r)", eid)
- # print (eid)
+
uuid = self.keytable[eid]
content=dict(id=eid, uuid=uuid)
self.dead_engines.add(uuid)
- # self.ids.remove(eid)
- # uuid = self.keytable.pop(eid)
- #
- # ec = self.engines.pop(eid)
- # self.hearts.pop(ec.heartbeat)
- # self.by_ident.pop(ec.queue)
- # self.completed.pop(eid)
- handleit = lambda : self._handle_stranded_msgs(eid, uuid)
- dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
- dc.start()
+
+ self.loop.add_timeout(
+ self.loop.time() + self.registration_timeout,
+ lambda : self._handle_stranded_msgs(eid, uuid),
+ )
############## TODO: HANDLE IT ################
self._save_engine_state()
@@ -1040,7 +1030,7 @@ class Hub(SessionFactory):
return
self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
if ec.stallback is not None:
- ec.stallback.stop()
+ self.loop.remove_timeout(ec.stallback)
eid = ec.id
self.ids.add(eid)
self.keytable[eid] = ec.uuid
@@ -1133,8 +1123,7 @@ class Hub(SessionFactory):
self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
# also notify other clients of shutdown
self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
- dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
- dc.start()
+ self.loop.add_timeout(self.loop.time() + 1, self._shutdown)
def _shutdown(self):
self.log.info("hub::hub shutting down.")
diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py
index 1894b93..fbf07c4 100644
--- a/IPython/parallel/controller/scheduler.py
+++ b/IPython/parallel/controller/scheduler.py
@@ -321,8 +321,9 @@ class TaskScheduler(SessionFactory):
# wait 5 seconds before cleaning up pending jobs, since the results might
# still be incoming
if self.pending[uid]:
- dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
- dc.start()
+ self.loop.add_timeout(self.loop.time() + 5,
+ lambda : self.handle_stranded_tasks(uid),
+ )
else:
self.completed.pop(uid)
self.failed.pop(uid)
diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py
index 326a690..b31b16d 100644
--- a/IPython/parallel/engine/engine.py
+++ b/IPython/parallel/engine/engine.py
@@ -25,7 +25,6 @@ from IPython.parallel.controller.heartmonitor import Heart
from IPython.parallel.factory import RegistrationFactory
from IPython.parallel.util import disambiguate_url
-from IPython.kernel.zmq.session import Message
from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel
from IPython.kernel.zmq.kernelapp import IPKernelApp
@@ -155,7 +154,7 @@ class EngineFactory(RegistrationFactory):
def complete_registration(self, msg, connect, maybe_tunnel):
# print msg
- self._abort_dc.stop()
+ self.loop.remove_timeout(self._abort_dc)
ctx = self.context
loop = self.loop
identity = self.bident
@@ -293,9 +292,10 @@ class EngineFactory(RegistrationFactory):
def start(self):
- dc = ioloop.DelayedCallback(self.register, 0, self.loop)
- dc.start()
- self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
- self._abort_dc.start()
+ loop = self.loop
+ def _start():
+ self.register()
+ loop.add_timeout(loop.time() + self.timeout, self.abort)
+ self.loop.add_callback(_start)