##// END OF EJS Templates
fire 'dead' callbacks when kernel dies before its first heartbeat
fire 'dead' callbacks when kernel dies before its first heartbeat

File last commit:

r9372:37f32253
r10314:e0eabd78
Show More
engine.py
305 lines | 12.8 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 """A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
MinRK
cleanup pass
r3644 connected to the Controller's Schedulers.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Min RK
MinRK
prep newparallel for rebase...
r3539 """
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
resort imports in a cleaner order
r3631
MinRK
prep newparallel for rebase...
r3539 import sys
import time
MinRK
add ssh tunneling to Engine...
r4585 from getpass import getpass
MinRK
prep newparallel for rebase...
r3539
import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
add ssh tunneling to Engine...
r4585 from IPython.external.ssh import tunnel
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # internal
W. Trevor King
parallel: Use utils.localinterfaces.LOCALHOST
r9254 from IPython.utils.localinterfaces import LOCALHOST
MinRK
add ssh tunneling to Engine...
r4585 from IPython.utils.traitlets import (
Jan Schulz
Change types of some config items and adjust code accordingly...
r8287 Instance, Dict, Integer, Type, Float, Integer, Unicode, CBytes, Bool
MinRK
add ssh tunneling to Engine...
r4585 )
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 from IPython.utils.py3compat import cast_bytes
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.controller.heartmonitor import Heart
from IPython.parallel.factory import RegistrationFactory
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 from IPython.parallel.util import disambiguate_url
MinRK
organize IPython.parallel into subpackages
r3673
MinRK
mv IPython.zmq to IPython.kernel.zmq
r9372 from IPython.kernel.zmq.session import Message
from IPython.kernel.zmq.ipkernel import Kernel
from IPython.kernel.zmq.kernelapp import IPKernelApp
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 class EngineFactory(RegistrationFactory):
MinRK
prep newparallel for rebase...
r3539 """IPython engine"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # configurables:
MinRK
mv IPython.zmq to IPython.kernel.zmq
r9372 out_stream_factory=Type('IPython.kernel.zmq.iostream.OutStream', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""The OutStream for handling stdout/err.
MinRK
mv IPython.zmq to IPython.kernel.zmq
r9372 Typically 'IPython.kernel.zmq.iostream.OutStream'""")
display_hook_factory=Type('IPython.kernel.zmq.displayhook.ZMQDisplayHook', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""The class for handling displayhook.
MinRK
mv IPython.zmq to IPython.kernel.zmq
r9372 Typically 'IPython.kernel.zmq.displayhook.ZMQDisplayHook'""")
MinRK
cleanup parallel traits...
r3988 location=Unicode(config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""The location (an IP address) of the controller. This is
used for disambiguating URLs, to determine whether
loopback should be used to connect or the public address.""")
Jan Schulz
Change types of some config items and adjust code accordingly...
r8287 timeout=Float(5.0, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""The time (in seconds) to wait for the Controller to respond
to registration requests before giving up.""")
Jan Schulz
Enable heartbeat montoring on default...
r8576 max_heartbeat_misses=Integer(50, config=True,
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283 help="""The maximum number of times a check for the heartbeat ping of a
Jan Schulz
Rework the heartbeat checking (configureable, period from hub)...
r8570 controller can be missed before shutting down the engine.
If set to 0, the check is disabled.""")
MinRK
add ssh tunneling to Engine...
r4585 sshserver=Unicode(config=True,
help="""The SSH server to use for tunneling connections to the Controller.""")
sshkey=Unicode(config=True,
MinRK
specify sshkey is *private*
r4589 help="""The SSH private key file to use when tunneling connections to the Controller.""")
MinRK
add ssh tunneling to Engine...
r4585 paramiko=Bool(sys.platform == 'win32', config=True,
help="""Whether to use paramiko instead of openssh for tunnels.""")
Bernardo B. Marques
remove all trailling spaces
r4872
Jan Schulz
Rework the heartbeat checking (configureable, period from hub)...
r8570
MinRK
Refactor newparallel to use Config system...
r3604 # not configurable:
MinRK
simplify IPython.parallel connections...
r7889 connection_info = Dict()
user_ns = Dict()
id = Integer(allow_none=True)
registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
kernel = Instance(Kernel)
Jan Schulz
Rework the heartbeat checking (configureable, period from hub)...
r8570 hb_check_period=Integer()
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283
# States for the heartbeat monitoring
Jan Schulz
Change initial values of monitor states for Py3 compatibility
r8286 # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that
# during the first check no "missed" ping is reported. Must be floats for Python 3 compatibility.
_hb_last_pinged = 0.0
_hb_last_monitored = 0.0
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283 _hb_missed_beats = 0
# The zmq Stream which receives the pings from the Heart
_hb_listener = None
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
enforce ascii identities in parallel code...
r4160 bident = CBytes()
ident = Unicode()
def _ident_changed(self, name, old, new):
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 self.bident = cast_bytes(new)
MinRK
add ssh tunneling to Engine...
r4585 using_ssh=Bool(False)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 def __init__(self, **kwargs):
MinRK
Refactor newparallel to use Config system...
r3604 super(EngineFactory, self).__init__(**kwargs)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.ident = self.session.session
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 def init_connector(self):
"""construct connection function, which handles tunnels."""
self.using_ssh = bool(self.sshkey or self.sshserver)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 if self.sshkey and not self.sshserver:
# We are using ssh directly to the controller, tunneling localhost to localhost
self.sshserver = self.url.split('://')[1].split(':')[0]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 if self.using_ssh:
if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
password=False
else:
password = getpass("SSH Password for %s: "%self.sshserver)
else:
password = False
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 def connect(s, url):
url = disambiguate_url(url, self.location)
if self.using_ssh:
MinRK
simplify IPython.parallel connections...
r7889 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
MinRK
add ssh tunneling to Engine...
r4585 return tunnel.tunnel_connection(s, url, self.sshserver,
keyfile=self.sshkey, paramiko=self.paramiko,
password=password,
)
else:
return s.connect(url)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 def maybe_tunnel(url):
"""like connect, but don't complete the connection (for use by heartbeat)"""
url = disambiguate_url(url, self.location)
if self.using_ssh:
MinRK
simplify IPython.parallel connections...
r7889 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
MinRK
add ssh tunneling to Engine...
r4585 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
keyfile=self.sshkey, paramiko=self.paramiko,
password=password,
)
MinRK
simplify IPython.parallel connections...
r7889 return str(url)
MinRK
add ssh tunneling to Engine...
r4585 return connect, maybe_tunnel
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def register(self):
MinRK
Refactor newparallel to use Config system...
r3604 """send the registration_request"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow engines to wait for url_files to arrive...
r4120 self.log.info("Registering with controller at %s"%self.url)
MinRK
add ssh tunneling to Engine...
r4585 ctx = self.context
connect,maybe_tunnel = self.init_connector()
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 reg = ctx.socket(zmq.DEALER)
MinRK
add ssh tunneling to Engine...
r4585 reg.setsockopt(zmq.IDENTITY, self.bident)
connect(reg, self.url)
self.registrar = zmqstream.ZMQStream(reg, self.loop)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
enables resume of ipcontroller...
r7891 content = dict(uuid=self.ident)
MinRK
add ssh tunneling to Engine...
r4585 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
MinRK
added exec_key and fixed client.shutdown
r3575 # print (self.session.key)
MinRK
simplify IPython.parallel connections...
r7889 self.session.send(self.registrar, "registration_request", content=content)
Bernardo B. Marques
remove all trailling spaces
r4872
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283 def _report_ping(self, msg):
"""Callback for when the heartmonitor.Heart receives a ping"""
Jan Schulz
Rework the heartbeat checking (configureable, period from hub)...
r8570 #self.log.debug("Received a ping: %s", msg)
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283 self._hb_last_pinged = time.time()
MinRK
add ssh tunneling to Engine...
r4585 def complete_registration(self, msg, connect, maybe_tunnel):
MinRK
prep newparallel for rebase...
r3539 # print msg
MinRK
persist connection data to disk as json
r3614 self._abort_dc.stop()
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
loop = self.loop
MinRK
enforce ascii identities in parallel code...
r4160 identity = self.bident
MinRK
prep newparallel for rebase...
r3539 idents,msg = self.session.feed_identities(msg)
MinRK
simplify IPython.parallel connections...
r7889 msg = self.session.unserialize(msg)
content = msg['content']
info = self.connection_info
MinRK
use individual ports, rather than full urls in connection files
r7890 def url(key):
"""get zmq url for given channel"""
return str(info["interface"] + ":%i" % info[key])
MinRK
simplify IPython.parallel connections...
r7889 if content['status'] == 'ok':
self.id = int(content['id'])
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 # launch heartbeat
# possibly forward hb ports with tunnels
MinRK
use individual ports, rather than full urls in connection files
r7890 hb_ping = maybe_tunnel(url('hb_ping'))
hb_pong = maybe_tunnel(url('hb_pong'))
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283
Jan Schulz
Rework the heartbeat checking (configureable, period from hub)...
r8570 hb_monitor = None
if self.max_heartbeat_misses > 0:
Jan Schulz
Run code only when monitoring is enabled...
r8572 # Add a monitor socket which will record the last time a ping was seen
mon = self.context.socket(zmq.SUB)
W. Trevor King
parallel: Use utils.localinterfaces.LOCALHOST
r9254 mport = mon.bind_to_random_port('tcp://%s' % LOCALHOST)
Jan Schulz
Run code only when monitoring is enabled...
r8572 mon.setsockopt(zmq.SUBSCRIBE, b"")
self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
self._hb_listener.on_recv(self._report_ping)
W. Trevor King
parallel: Use utils.localinterfaces.LOCALHOST
r9254 hb_monitor = "tcp://%s:%i" % (LOCALHOST, mport)
MinRK
simplify IPython.parallel connections...
r7889
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
MinRK
add ssh tunneling to Engine...
r4585 heart.start()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
simplify IPython.parallel connections...
r7889 # create Shell Connections (MUX, Task, etc.):
MinRK
use individual ports, rather than full urls in connection files
r7890 shell_addrs = url('mux'), url('task')
MinRK
simplify IPython.parallel connections...
r7889
# Use only one shell stream for mux and tasks
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 stream.setsockopt(zmq.IDENTITY, identity)
shell_streams = [stream]
MinRK
Refactor newparallel to use Config system...
r3604 for addr in shell_addrs:
MinRK
add ssh tunneling to Engine...
r4585 connect(stream, addr)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # control stream:
MinRK
use individual ports, rather than full urls in connection files
r7890 control_addr = url('control')
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
MinRK
Refactor newparallel to use Config system...
r3604 control_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
add ssh tunneling to Engine...
r4585 connect(control_stream, control_addr)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # create iopub stream:
MinRK
use individual ports, rather than full urls in connection files
r7890 iopub_addr = url('iopub')
MinRK
do not use ZMQStream for IOPub...
r6804 iopub_socket = ctx.socket(zmq.PUB)
iopub_socket.setsockopt(zmq.IDENTITY, identity)
connect(iopub_socket, iopub_addr)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
don't use history files in Engines
r6801 # disable history:
self.config.HistoryManager.hist_file = ':memory:'
# Redirect input streams and set a display hook.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if self.out_stream_factory:
MinRK
do not use ZMQStream for IOPub...
r6804 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
MinRK
do not use ZMQStream for IOPub...
r6804 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if self.display_hook_factory:
MinRK
do not use ZMQStream for IOPub...
r6804 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
MinRK
enforce ascii identities in parallel code...
r4160
Bernardo B. Marques
remove all trailling spaces
r4872 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
MinRK
do not use ZMQStream for IOPub...
r6804 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
MinRK
use IPython.zmq.kernel in parallel Engines
r6791 loop=loop, user_ns=self.user_ns, log=self.log)
MinRK
use KernelApp.exec_lines/files in IPEngineApp...
r8062
MinRK
add topic to display publisher, and fix set_parent for apply_requests
r6834 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
MinRK
use KernelApp.exec_lines/files in IPEngineApp...
r8062
Jan Schulz
Rework the heartbeat checking (configureable, period from hub)...
r8570
# periodically check the heartbeat pings of the controller
# Should be started here and not in "start()" so that the right period can be taken
# from the hubs HeartBeatMonitor.period
if self.max_heartbeat_misses > 0:
# Use a slightly bigger check period than the hub signal period to not warn unnecessary
self.hb_check_period = int(content['hb_period'])+10
self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period)
self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop)
self._hb_reporter.start()
else:
self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.")
MinRK
use KernelApp.exec_lines/files in IPEngineApp...
r8062 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
app.init_profile_dir()
app.init_code()
MinRK
Refactor newparallel to use Config system...
r3604 self.kernel.start()
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
persist connection data to disk as json
r3614 self.log.fatal("Registration Failed: %s"%msg)
MinRK
prep newparallel for rebase...
r3539 raise Exception("Registration Failed: %s"%msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 self.log.info("Completed registration with id %i"%self.id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
persist connection data to disk as json
r3614 def abort(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
MinRK
cleanup per review by @fperez...
r5218 if self.url.startswith('127.'):
MinRK
add early shutdown detection, and public-ip message to ipcluster/ipengine...
r5205 self.log.fatal("""
If the controller and engines are not on the same machine,
you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
c.HubFactory.ip='*' # for all interfaces, internal and external
c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
or tunnel connections via ssh.
""")
MinRK
Refactor newparallel to use Config system...
r3604 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
MinRK
prep newparallel for rebase...
r3539 time.sleep(1)
MinRK
persist connection data to disk as json
r3614 sys.exit(255)
Bernardo B. Marques
remove all trailling spaces
r4872
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283 def _hb_monitor(self):
"""Callback to monitor the heartbeat from the controller"""
self._hb_listener.flush()
if self._hb_last_monitored > self._hb_last_pinged:
self._hb_missed_beats += 1
Jan Schulz
Change types of some config items and adjust code accordingly...
r8287 self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats)
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283 else:
Jan Schulz
Rework the heartbeat checking (configureable, period from hub)...
r8570 #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats)
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283 self._hb_missed_beats = 0
Jan Schulz
Rework the heartbeat checking (configureable, period from hub)...
r8570 if self._hb_missed_beats >= self.max_heartbeat_misses:
Jan Schulz
Change types of some config items and adjust code accordingly...
r8287 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.",
Jan Schulz
Rework the heartbeat checking (configureable, period from hub)...
r8570 self.max_heartbeat_misses, self.hb_check_period)
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
self.loop.stop()
self._hb_last_monitored = time.time()
MinRK
prep newparallel for rebase...
r3539 def start(self):
MinRK
Refactor newparallel to use Config system...
r3604 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
dc.start()
MinRK
persist connection data to disk as json
r3614 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
self._abort_dc.start()
Jan Schulz
Monitor the heartbeat of the cluster controller...
r8283
MinRK
Refactor newparallel to use Config system...
r3604