##// END OF EJS Templates
Backport PR #2926: Don't die if stderr/stdout do not support set_parent() #2925...
Backport PR #2926: Don't die if stderr/stdout do not support set_parent() #2925 If the user redirects the streams to a file for example, the kernel would raise an exception because file objects do not have set_parent defined, unlike ipython's OutStream. This happens even if the user does this in a spawned thread. closes #2925

File last commit:

r8079:b1241d50
r9971:f85e1f84
Show More
engine.py
242 lines | 9.8 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 """A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
MinRK
cleanup pass
r3644 connected to the Controller's Schedulers.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Min RK
MinRK
prep newparallel for rebase...
r3539 """
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
resort imports in a cleaner order
r3631
MinRK
prep newparallel for rebase...
r3539 import sys
import time
MinRK
add ssh tunneling to Engine...
r4585 from getpass import getpass
MinRK
prep newparallel for rebase...
r3539
import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
add ssh tunneling to Engine...
r4585 from IPython.external.ssh import tunnel
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # internal
MinRK
add ssh tunneling to Engine...
r4585 from IPython.utils.traitlets import (
MinRK
add Integer traitlet...
r5344 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
MinRK
add ssh tunneling to Engine...
r4585 )
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 from IPython.utils.py3compat import cast_bytes
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.controller.heartmonitor import Heart
from IPython.parallel.factory import RegistrationFactory
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 from IPython.parallel.util import disambiguate_url
MinRK
organize IPython.parallel into subpackages
r3673
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 from IPython.zmq.session import Message
MinRK
Backport PR #2214: use KernelApp.exec_lines/files in IPEngineApp...
r8079 from IPython.zmq.ipkernel import Kernel, IPKernelApp
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 class EngineFactory(RegistrationFactory):
MinRK
prep newparallel for rebase...
r3539 """IPython engine"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # configurables:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
help="""The OutStream for handling stdout/err.
Typically 'IPython.zmq.iostream.OutStream'""")
Thomas Kluyver
Move displayhook for ZMQ shell to zmq.displayhook, and rename to make the difference clearer.
r4067 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""The class for handling displayhook.
Thomas Kluyver
Move displayhook for ZMQ shell to zmq.displayhook, and rename to make the difference clearer.
r4067 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
MinRK
cleanup parallel traits...
r3988 location=Unicode(config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""The location (an IP address) of the controller. This is
used for disambiguating URLs, to determine whether
loopback should be used to connect or the public address.""")
timeout=CFloat(2,config=True,
help="""The time (in seconds) to wait for the Controller to respond
to registration requests before giving up.""")
MinRK
add ssh tunneling to Engine...
r4585 sshserver=Unicode(config=True,
help="""The SSH server to use for tunneling connections to the Controller.""")
sshkey=Unicode(config=True,
MinRK
specify sshkey is *private*
r4589 help="""The SSH private key file to use when tunneling connections to the Controller.""")
MinRK
add ssh tunneling to Engine...
r4585 paramiko=Bool(sys.platform == 'win32', config=True,
help="""Whether to use paramiko instead of openssh for tunnels.""")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # not configurable:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 user_ns=Dict()
MinRK
add Integer traitlet...
r5344 id=Integer(allow_none=True)
MinRK
Refactor newparallel to use Config system...
r3604 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
kernel=Instance(Kernel)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
enforce ascii identities in parallel code...
r4160 bident = CBytes()
ident = Unicode()
def _ident_changed(self, name, old, new):
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 self.bident = cast_bytes(new)
MinRK
add ssh tunneling to Engine...
r4585 using_ssh=Bool(False)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 def __init__(self, **kwargs):
MinRK
Refactor newparallel to use Config system...
r3604 super(EngineFactory, self).__init__(**kwargs)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.ident = self.session.session
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 def init_connector(self):
"""construct connection function, which handles tunnels."""
self.using_ssh = bool(self.sshkey or self.sshserver)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 if self.sshkey and not self.sshserver:
# We are using ssh directly to the controller, tunneling localhost to localhost
self.sshserver = self.url.split('://')[1].split(':')[0]
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 if self.using_ssh:
if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
password=False
else:
password = getpass("SSH Password for %s: "%self.sshserver)
else:
password = False
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 def connect(s, url):
url = disambiguate_url(url, self.location)
if self.using_ssh:
self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
return tunnel.tunnel_connection(s, url, self.sshserver,
keyfile=self.sshkey, paramiko=self.paramiko,
password=password,
)
else:
return s.connect(url)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 def maybe_tunnel(url):
"""like connect, but don't complete the connection (for use by heartbeat)"""
url = disambiguate_url(url, self.location)
if self.using_ssh:
self.log.debug("Tunneling connection to %s via %s"%(url, self.sshserver))
url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
keyfile=self.sshkey, paramiko=self.paramiko,
password=password,
)
return url
return connect, maybe_tunnel
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def register(self):
MinRK
Refactor newparallel to use Config system...
r3604 """send the registration_request"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
allow engines to wait for url_files to arrive...
r4120 self.log.info("Registering with controller at %s"%self.url)
MinRK
add ssh tunneling to Engine...
r4585 ctx = self.context
connect,maybe_tunnel = self.init_connector()
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 reg = ctx.socket(zmq.DEALER)
MinRK
add ssh tunneling to Engine...
r4585 reg.setsockopt(zmq.IDENTITY, self.bident)
connect(reg, self.url)
self.registrar = zmqstream.ZMQStream(reg, self.loop)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
control channel progress
r3540 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
MinRK
add ssh tunneling to Engine...
r4585 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
MinRK
added exec_key and fixed client.shutdown
r3575 # print (self.session.key)
MinRK
prep newparallel for rebase...
r3539 self.session.send(self.registrar, "registration_request",content=content)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 def complete_registration(self, msg, connect, maybe_tunnel):
MinRK
prep newparallel for rebase...
r3539 # print msg
MinRK
persist connection data to disk as json
r3614 self._abort_dc.stop()
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
loop = self.loop
MinRK
enforce ascii identities in parallel code...
r4160 identity = self.bident
MinRK
prep newparallel for rebase...
r3539 idents,msg = self.session.feed_identities(msg)
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4231 msg = Message(self.session.unserialize(msg))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 if msg.content.status == 'ok':
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.id = int(msg.content.id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 # launch heartbeat
hb_addrs = msg.content.heartbeat
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add ssh tunneling to Engine...
r4585 # possibly forward hb ports with tunnels
hb_addrs = [ maybe_tunnel(addr) for addr in hb_addrs ]
heart = Heart(*map(str, hb_addrs), heart_id=identity)
heart.start()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # create Shell Streams (MUX, Task, etc.):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 queue_addr = msg.content.mux
shell_addrs = [ str(queue_addr) ]
MinRK
prep newparallel for rebase...
r3539 task_addr = msg.content.task
if task_addr:
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 shell_addrs.append(str(task_addr))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # Uncomment this to go back to two-socket model
# shell_streams = []
# for addr in shell_addrs:
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 # stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # stream.setsockopt(zmq.IDENTITY, identity)
# stream.connect(disambiguate_url(addr, self.location))
# shell_streams.append(stream)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # Now use only one shell stream for mux and tasks
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 stream.setsockopt(zmq.IDENTITY, identity)
shell_streams = [stream]
MinRK
Refactor newparallel to use Config system...
r3604 for addr in shell_addrs:
MinRK
add ssh tunneling to Engine...
r4585 connect(stream, addr)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # end single stream-socket
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # control stream:
control_addr = str(msg.content.control)
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
MinRK
Refactor newparallel to use Config system...
r3604 control_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
add ssh tunneling to Engine...
r4585 connect(control_stream, control_addr)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Refactor newparallel to use Config system...
r3604 # create iopub stream:
iopub_addr = msg.content.iopub
MinRK
do not use ZMQStream for IOPub...
r6804 iopub_socket = ctx.socket(zmq.PUB)
iopub_socket.setsockopt(zmq.IDENTITY, identity)
connect(iopub_socket, iopub_addr)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
don't use history files in Engines
r6801 # disable history:
self.config.HistoryManager.hist_file = ':memory:'
# Redirect input streams and set a display hook.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if self.out_stream_factory:
MinRK
do not use ZMQStream for IOPub...
r6804 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
MinRK
do not use ZMQStream for IOPub...
r6804 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if self.display_hook_factory:
MinRK
do not use ZMQStream for IOPub...
r6804 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
MinRK
enforce ascii identities in parallel code...
r4160
Bernardo B. Marques
remove all trailling spaces
r4872 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
MinRK
do not use ZMQStream for IOPub...
r6804 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
MinRK
use IPython.zmq.kernel in parallel Engines
r6791 loop=loop, user_ns=self.user_ns, log=self.log)
MinRK
Backport PR #2214: use KernelApp.exec_lines/files in IPEngineApp...
r8079
MinRK
add topic to display publisher, and fix set_parent for apply_requests
r6834 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Backport PR #2214: use KernelApp.exec_lines/files in IPEngineApp...
r8079 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
app.init_profile_dir()
app.init_code()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Backport PR #2214: use KernelApp.exec_lines/files in IPEngineApp...
r8079 self.kernel.start()
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
persist connection data to disk as json
r3614 self.log.fatal("Registration Failed: %s"%msg)
MinRK
prep newparallel for rebase...
r3539 raise Exception("Registration Failed: %s"%msg)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 self.log.info("Completed registration with id %i"%self.id)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
persist connection data to disk as json
r3614 def abort(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
MinRK
cleanup per review by @fperez...
r5218 if self.url.startswith('127.'):
MinRK
add early shutdown detection, and public-ip message to ipcluster/ipengine...
r5205 self.log.fatal("""
If the controller and engines are not on the same machine,
you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
c.HubFactory.ip='*' # for all interfaces, internal and external
c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
or tunnel connections via ssh.
""")
MinRK
Refactor newparallel to use Config system...
r3604 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
MinRK
prep newparallel for rebase...
r3539 time.sleep(1)
MinRK
persist connection data to disk as json
r3614 sys.exit(255)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def start(self):
MinRK
Refactor newparallel to use Config system...
r3604 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
dc.start()
MinRK
persist connection data to disk as json
r3614 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
self._abort_dc.start()
MinRK
Refactor newparallel to use Config system...
r3604