engine.py
301 lines
| 12.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | """A simple engine that talks to a controller over 0MQ. | ||
it handles registration, etc. and launches a kernel | ||||
MinRK
|
r3644 | connected to the Controller's Schedulers. | ||
MinRK
|
r3539 | """ | ||
MinRK
|
r16568 | |||
# Copyright (c) IPython Development Team. | ||||
# Distributed under the terms of the Modified BSD License. | ||||
MinRK
|
r3660 | |||
MinRK
|
r3552 | from __future__ import print_function | ||
MinRK
|
r3631 | |||
MinRK
|
r3539 | import sys | ||
import time | ||||
MinRK
|
r4585 | from getpass import getpass | ||
MinRK
|
r3539 | |||
import zmq | ||||
from zmq.eventloop import ioloop, zmqstream | ||||
MinRK
|
r12591 | from IPython.utils.localinterfaces import localhost | ||
MinRK
|
r4585 | from IPython.utils.traitlets import ( | ||
MinRK
|
r17054 | Instance, Dict, Integer, Type, Float, Unicode, CBytes, Bool | ||
MinRK
|
r4585 | ) | ||
MinRK
|
r6813 | from IPython.utils.py3compat import cast_bytes | ||
MinRK
|
r3569 | |||
MinRK
|
r3673 | from IPython.parallel.controller.heartmonitor import Heart | ||
from IPython.parallel.factory import RegistrationFactory | ||||
MinRK
|
r6813 | from IPython.parallel.util import disambiguate_url | ||
MinRK
|
r3673 | |||
MinRK
|
r9372 | from IPython.kernel.zmq.session import Message | ||
Thomas Kluyver
|
r17095 | from IPython.kernel.zmq.ipkernel import IPythonKernel as Kernel | ||
MinRK
|
r9372 | from IPython.kernel.zmq.kernelapp import IPKernelApp | ||
MinRK
|
r3539 | |||
MinRK
|
r3604 | class EngineFactory(RegistrationFactory): | ||
MinRK
|
r3539 | """IPython engine""" | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3603 | # configurables: | ||
MinRK
|
r9372 | out_stream_factory=Type('IPython.kernel.zmq.iostream.OutStream', config=True, | ||
MinRK
|
r3985 | help="""The OutStream for handling stdout/err. | ||
MinRK
|
r9372 | Typically 'IPython.kernel.zmq.iostream.OutStream'""") | ||
display_hook_factory=Type('IPython.kernel.zmq.displayhook.ZMQDisplayHook', config=True, | ||||
MinRK
|
r3985 | help="""The class for handling displayhook. | ||
MinRK
|
r9372 | Typically 'IPython.kernel.zmq.displayhook.ZMQDisplayHook'""") | ||
MinRK
|
r3988 | location=Unicode(config=True, | ||
MinRK
|
r3985 | help="""The location (an IP address) of the controller. This is | ||
used for disambiguating URLs, to determine whether | ||||
loopback should be used to connect or the public address.""") | ||||
Jan Schulz
|
r8287 | timeout=Float(5.0, config=True, | ||
MinRK
|
r3985 | help="""The time (in seconds) to wait for the Controller to respond | ||
to registration requests before giving up.""") | ||||
Jan Schulz
|
r8576 | max_heartbeat_misses=Integer(50, config=True, | ||
Jan Schulz
|
r8283 | help="""The maximum number of times a check for the heartbeat ping of a | ||
Jan Schulz
|
r8570 | controller can be missed before shutting down the engine. | ||
If set to 0, the check is disabled.""") | ||||
MinRK
|
r4585 | sshserver=Unicode(config=True, | ||
help="""The SSH server to use for tunneling connections to the Controller.""") | ||||
sshkey=Unicode(config=True, | ||||
MinRK
|
r4589 | help="""The SSH private key file to use when tunneling connections to the Controller.""") | ||
MinRK
|
r4585 | paramiko=Bool(sys.platform == 'win32', config=True, | ||
help="""Whether to use paramiko instead of openssh for tunnels.""") | ||||
MinRK
|
r17054 | |||
@property | ||||
def tunnel_mod(self): | ||||
from zmq.ssh import tunnel | ||||
return tunnel | ||||
Bernardo B. Marques
|
r4872 | |||
Jan Schulz
|
r8570 | |||
MinRK
|
r3604 | # not configurable: | ||
MinRK
|
r7889 | connection_info = Dict() | ||
user_ns = Dict() | ||||
id = Integer(allow_none=True) | ||||
registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
kernel = Instance(Kernel) | ||||
Jan Schulz
|
r8570 | hb_check_period=Integer() | ||
Jan Schulz
|
r8283 | |||
# States for the heartbeat monitoring | ||||
Jan Schulz
|
r8286 | # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that | ||
# during the first check no "missed" ping is reported. Must be floats for Python 3 compatibility. | ||||
_hb_last_pinged = 0.0 | ||||
_hb_last_monitored = 0.0 | ||||
Jan Schulz
|
r8283 | _hb_missed_beats = 0 | ||
# The zmq Stream which receives the pings from the Heart | ||||
_hb_listener = None | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4160 | bident = CBytes() | ||
ident = Unicode() | ||||
def _ident_changed(self, name, old, new): | ||||
MinRK
|
r6813 | self.bident = cast_bytes(new) | ||
MinRK
|
r4585 | using_ssh=Bool(False) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3603 | def __init__(self, **kwargs): | ||
MinRK
|
r3604 | super(EngineFactory, self).__init__(**kwargs) | ||
MinRK
|
r3985 | self.ident = self.session.session | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4585 | def init_connector(self): | ||
"""construct connection function, which handles tunnels.""" | ||||
self.using_ssh = bool(self.sshkey or self.sshserver) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4585 | if self.sshkey and not self.sshserver: | ||
# We are using ssh directly to the controller, tunneling localhost to localhost | ||||
self.sshserver = self.url.split('://')[1].split(':')[0] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4585 | if self.using_ssh: | ||
MinRK
|
r17054 | if self.tunnel_mod.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko): | ||
MinRK
|
r4585 | password=False | ||
else: | ||||
password = getpass("SSH Password for %s: "%self.sshserver) | ||||
else: | ||||
password = False | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4585 | def connect(s, url): | ||
url = disambiguate_url(url, self.location) | ||||
if self.using_ssh: | ||||
MinRK
|
r7889 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | ||
MinRK
|
r17054 | return self.tunnel_mod.tunnel_connection(s, url, self.sshserver, | ||
MinRK
|
r4585 | keyfile=self.sshkey, paramiko=self.paramiko, | ||
password=password, | ||||
) | ||||
else: | ||||
return s.connect(url) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4585 | def maybe_tunnel(url): | ||
"""like connect, but don't complete the connection (for use by heartbeat)""" | ||||
url = disambiguate_url(url, self.location) | ||||
if self.using_ssh: | ||||
MinRK
|
r7889 | self.log.debug("Tunneling connection to %s via %s", url, self.sshserver) | ||
MinRK
|
r17054 | url, tunnelobj = self.tunnel_mod.open_tunnel(url, self.sshserver, | ||
MinRK
|
r4585 | keyfile=self.sshkey, paramiko=self.paramiko, | ||
password=password, | ||||
) | ||||
MinRK
|
r7889 | return str(url) | ||
MinRK
|
r4585 | return connect, maybe_tunnel | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def register(self): | ||
MinRK
|
r3604 | """send the registration_request""" | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4120 | self.log.info("Registering with controller at %s"%self.url) | ||
MinRK
|
r4585 | ctx = self.context | ||
connect,maybe_tunnel = self.init_connector() | ||||
MinRK
|
r4725 | reg = ctx.socket(zmq.DEALER) | ||
MinRK
|
r4585 | reg.setsockopt(zmq.IDENTITY, self.bident) | ||
connect(reg, self.url) | ||||
self.registrar = zmqstream.ZMQStream(reg, self.loop) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7891 | content = dict(uuid=self.ident) | ||
MinRK
|
r4585 | self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel)) | ||
MinRK
|
r3575 | # print (self.session.key) | ||
MinRK
|
r7889 | self.session.send(self.registrar, "registration_request", content=content) | ||
Bernardo B. Marques
|
r4872 | |||
Jan Schulz
|
r8283 | def _report_ping(self, msg): | ||
"""Callback for when the heartmonitor.Heart receives a ping""" | ||||
Jan Schulz
|
r8570 | #self.log.debug("Received a ping: %s", msg) | ||
Jan Schulz
|
r8283 | self._hb_last_pinged = time.time() | ||
MinRK
|
r4585 | def complete_registration(self, msg, connect, maybe_tunnel): | ||
MinRK
|
r3539 | # print msg | ||
MinRK
|
r3614 | self._abort_dc.stop() | ||
MinRK
|
r3604 | ctx = self.context | ||
loop = self.loop | ||||
MinRK
|
r4160 | identity = self.bident | ||
MinRK
|
r3539 | idents,msg = self.session.feed_identities(msg) | ||
MinRK
|
r7889 | msg = self.session.unserialize(msg) | ||
content = msg['content'] | ||||
info = self.connection_info | ||||
MinRK
|
r7890 | def url(key): | ||
"""get zmq url for given channel""" | ||||
return str(info["interface"] + ":%i" % info[key]) | ||||
MinRK
|
r7889 | if content['status'] == 'ok': | ||
self.id = int(content['id']) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4585 | # launch heartbeat | ||
# possibly forward hb ports with tunnels | ||||
MinRK
|
r7890 | hb_ping = maybe_tunnel(url('hb_ping')) | ||
hb_pong = maybe_tunnel(url('hb_pong')) | ||||
Jan Schulz
|
r8283 | |||
Jan Schulz
|
r8570 | hb_monitor = None | ||
if self.max_heartbeat_misses > 0: | ||||
Jan Schulz
|
r8572 | # Add a monitor socket which will record the last time a ping was seen | ||
mon = self.context.socket(zmq.SUB) | ||||
MinRK
|
r12591 | mport = mon.bind_to_random_port('tcp://%s' % localhost()) | ||
Jan Schulz
|
r8572 | mon.setsockopt(zmq.SUBSCRIBE, b"") | ||
self._hb_listener = zmqstream.ZMQStream(mon, self.loop) | ||||
self._hb_listener.on_recv(self._report_ping) | ||||
MinRK
|
r12591 | hb_monitor = "tcp://%s:%i" % (localhost(), mport) | ||
MinRK
|
r7889 | |||
Jan Schulz
|
r8283 | heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity) | ||
MinRK
|
r4585 | heart.start() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7889 | # create Shell Connections (MUX, Task, etc.): | ||
MinRK
|
r7890 | shell_addrs = url('mux'), url('task') | ||
MinRK
|
r7889 | |||
# Use only one shell stream for mux and tasks | ||||
MinRK
|
r4725 | stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) | ||
MinRK
|
r3657 | stream.setsockopt(zmq.IDENTITY, identity) | ||
shell_streams = [stream] | ||||
MinRK
|
r3604 | for addr in shell_addrs: | ||
MinRK
|
r4585 | connect(stream, addr) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3604 | # control stream: | ||
MinRK
|
r7890 | control_addr = url('control') | ||
MinRK
|
r4725 | control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop) | ||
MinRK
|
r3604 | control_stream.setsockopt(zmq.IDENTITY, identity) | ||
MinRK
|
r4585 | connect(control_stream, control_addr) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3604 | # create iopub stream: | ||
MinRK
|
r7890 | iopub_addr = url('iopub') | ||
MinRK
|
r6804 | iopub_socket = ctx.socket(zmq.PUB) | ||
iopub_socket.setsockopt(zmq.IDENTITY, identity) | ||||
connect(iopub_socket, iopub_addr) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6801 | # disable history: | ||
self.config.HistoryManager.hist_file = ':memory:' | ||||
# Redirect input streams and set a display hook. | ||||
MinRK
|
r3605 | if self.out_stream_factory: | ||
MinRK
|
r6804 | sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout') | ||
MinRK
|
r6813 | sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id) | ||
MinRK
|
r6804 | sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr') | ||
MinRK
|
r6813 | sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id) | ||
MinRK
|
r3605 | if self.display_hook_factory: | ||
MinRK
|
r6804 | sys.displayhook = self.display_hook_factory(self.session, iopub_socket) | ||
MinRK
|
r16568 | sys.displayhook.topic = cast_bytes('engine.%i.execute_result' % self.id) | ||
MinRK
|
r4160 | |||
MinRK
|
r11064 | self.kernel = Kernel(parent=self, int_id=self.id, ident=self.ident, session=self.session, | ||
MinRK
|
r6804 | control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket, | ||
MinRK
|
r6791 | loop=loop, user_ns=self.user_ns, log=self.log) | ||
MinRK
|
r8062 | |||
MinRK
|
r6834 | self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id) | ||
MinRK
|
r8062 | |||
Jan Schulz
|
r8570 | |||
# periodically check the heartbeat pings of the controller | ||||
# Should be started here and not in "start()" so that the right period can be taken | ||||
# from the hubs HeartBeatMonitor.period | ||||
if self.max_heartbeat_misses > 0: | ||||
# Use a slightly bigger check period than the hub signal period to not warn unnecessary | ||||
self.hb_check_period = int(content['hb_period'])+10 | ||||
self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period) | ||||
self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop) | ||||
self._hb_reporter.start() | ||||
else: | ||||
self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.") | ||||
MinRK
|
r8062 | # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged | ||
MinRK
|
r11064 | app = IPKernelApp(parent=self, shell=self.kernel.shell, kernel=self.kernel, log=self.log) | ||
MinRK
|
r8062 | app.init_profile_dir() | ||
app.init_code() | ||||
MinRK
|
r3604 | self.kernel.start() | ||
MinRK
|
r3539 | else: | ||
MinRK
|
r3614 | self.log.fatal("Registration Failed: %s"%msg) | ||
MinRK
|
r3539 | raise Exception("Registration Failed: %s"%msg) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3610 | self.log.info("Completed registration with id %i"%self.id) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3614 | def abort(self): | ||
MinRK
|
r3985 | self.log.fatal("Registration timed out after %.1f seconds"%self.timeout) | ||
MinRK
|
r5218 | if self.url.startswith('127.'): | ||
MinRK
|
r5205 | self.log.fatal(""" | ||
If the controller and engines are not on the same machine, | ||||
you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py): | ||||
c.HubFactory.ip='*' # for all interfaces, internal and external | ||||
c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see | ||||
or tunnel connections via ssh. | ||||
""") | ||||
MinRK
|
r3604 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) | ||
MinRK
|
r3539 | time.sleep(1) | ||
MinRK
|
r3614 | sys.exit(255) | ||
Bernardo B. Marques
|
r4872 | |||
Jan Schulz
|
r8283 | def _hb_monitor(self): | ||
"""Callback to monitor the heartbeat from the controller""" | ||||
self._hb_listener.flush() | ||||
if self._hb_last_monitored > self._hb_last_pinged: | ||||
self._hb_missed_beats += 1 | ||||
Jan Schulz
|
r8287 | self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats) | ||
Jan Schulz
|
r8283 | else: | ||
Jan Schulz
|
r8570 | #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats) | ||
Jan Schulz
|
r8283 | self._hb_missed_beats = 0 | ||
Jan Schulz
|
r8570 | if self._hb_missed_beats >= self.max_heartbeat_misses: | ||
Jan Schulz
|
r8287 | self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.", | ||
Jan Schulz
|
r8570 | self.max_heartbeat_misses, self.hb_check_period) | ||
Jan Schulz
|
r8283 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) | ||
self.loop.stop() | ||||
self._hb_last_monitored = time.time() | ||||
MinRK
|
r3539 | def start(self): | ||
MinRK
|
r3604 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) | ||
dc.start() | ||||
MinRK
|
r3614 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) | ||
self._abort_dc.start() | ||||
Jan Schulz
|
r8283 | |||
MinRK
|
r3604 | |||