##// END OF EJS Templates
Basic exception display in the notebook is working.
Basic exception display in the notebook is working.

File last commit:

r4283:efa1f33b
r4319:bffb1197
Show More
engine.py
174 lines | 6.8 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
MinRK
cleanup pass
r3644 connected to the Controller's Schedulers.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Min RK
MinRK
prep newparallel for rebase...
r3539 """
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
resort imports in a cleaner order
r3631
MinRK
prep newparallel for rebase...
r3539 import sys
import time
import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # internal
MinRK
enforce ascii identities in parallel code...
r4160 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode, CBytes
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # from IPython.utils.localinterfaces import LOCALHOST
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.controller.heartmonitor import Heart
from IPython.parallel.factory import RegistrationFactory
MinRK
cleanup per review...
r4161 from IPython.parallel.util import disambiguate_url, asbytes
MinRK
organize IPython.parallel into subpackages
r3673
MinRK
merge IPython.parallel.streamsession into IPython.zmq.session...
r4006 from IPython.zmq.session import Message
MinRK
eliminate relative imports
r3642 from .streamkernel import Kernel
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 class EngineFactory(RegistrationFactory):
MinRK
prep newparallel for rebase...
r3539 """IPython engine"""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # configurables:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
help="""The OutStream for handling stdout/err.
Typically 'IPython.zmq.iostream.OutStream'""")
Thomas Kluyver
Move displayhook for ZMQ shell to zmq.displayhook, and rename to make the difference clearer.
r4067 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""The class for handling displayhook.
Thomas Kluyver
Move displayhook for ZMQ shell to zmq.displayhook, and rename to make the difference clearer.
r4067 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
MinRK
cleanup parallel traits...
r3988 location=Unicode(config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""The location (an IP address) of the controller. This is
used for disambiguating URLs, to determine whether
loopback should be used to connect or the public address.""")
timeout=CFloat(2,config=True,
help="""The time (in seconds) to wait for the Controller to respond
to registration requests before giving up.""")
MinRK
Refactor newparallel to use Config system...
r3604
# not configurable:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 user_ns=Dict()
MinRK
Refactor newparallel to use Config system...
r3604 id=Int(allow_none=True)
registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
kernel=Instance(Kernel)
MinRK
enforce ascii identities in parallel code...
r4160 bident = CBytes()
ident = Unicode()
def _ident_changed(self, name, old, new):
MinRK
cleanup per review...
r4161 self.bident = asbytes(new)
MinRK
enforce ascii identities in parallel code...
r4160
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
def __init__(self, **kwargs):
MinRK
Refactor newparallel to use Config system...
r3604 super(EngineFactory, self).__init__(**kwargs)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.ident = self.session.session
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
MinRK
remove all PAIR sockets, Merge registration+query
r3657 reg = ctx.socket(zmq.XREQ)
MinRK
enforce ascii identities in parallel code...
r4160 reg.setsockopt(zmq.IDENTITY, self.bident)
MinRK
Refactor newparallel to use Config system...
r3604 reg.connect(self.url)
self.registrar = zmqstream.ZMQStream(reg, self.loop)
MinRK
prep newparallel for rebase...
r3539
def register(self):
MinRK
Refactor newparallel to use Config system...
r3604 """send the registration_request"""
MinRK
prep newparallel for rebase...
r3539
MinRK
allow engines to wait for url_files to arrive...
r4120 self.log.info("Registering with controller at %s"%self.url)
MinRK
control channel progress
r3540 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
MinRK
prep newparallel for rebase...
r3539 self.registrar.on_recv(self.complete_registration)
MinRK
added exec_key and fixed client.shutdown
r3575 # print (self.session.key)
MinRK
prep newparallel for rebase...
r3539 self.session.send(self.registrar, "registration_request",content=content)
def complete_registration(self, msg):
# print msg
MinRK
persist connection data to disk as json
r3614 self._abort_dc.stop()
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
loop = self.loop
MinRK
enforce ascii identities in parallel code...
r4160 identity = self.bident
MinRK
prep newparallel for rebase...
r3539 idents,msg = self.session.feed_identities(msg)
Brian E. Granger
Renaming unpack_message to unserialize and updating docstrings.
r4283 msg = Message(self.session.unserialize(msg))
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
prep newparallel for rebase...
r3539 if msg.content.status == 'ok':
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.id = int(msg.content.id)
MinRK
Refactor newparallel to use Config system...
r3604
# create Shell Streams (MUX, Task, etc.):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 queue_addr = msg.content.mux
shell_addrs = [ str(queue_addr) ]
MinRK
prep newparallel for rebase...
r3539 task_addr = msg.content.task
if task_addr:
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 shell_addrs.append(str(task_addr))
MinRK
remove all PAIR sockets, Merge registration+query
r3657
# Uncomment this to go back to two-socket model
# shell_streams = []
# for addr in shell_addrs:
# stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
# stream.setsockopt(zmq.IDENTITY, identity)
# stream.connect(disambiguate_url(addr, self.location))
# shell_streams.append(stream)
# Now use only one shell stream for mux and tasks
stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
stream.setsockopt(zmq.IDENTITY, identity)
shell_streams = [stream]
MinRK
Refactor newparallel to use Config system...
r3604 for addr in shell_addrs:
MinRK
persist connection data to disk as json
r3614 stream.connect(disambiguate_url(addr, self.location))
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # end single stream-socket
MinRK
Refactor newparallel to use Config system...
r3604
# control stream:
control_addr = str(msg.content.control)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
MinRK
Refactor newparallel to use Config system...
r3604 control_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
persist connection data to disk as json
r3614 control_stream.connect(disambiguate_url(control_addr, self.location))
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 # create iopub stream:
iopub_addr = msg.content.iopub
iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
iopub_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
persist connection data to disk as json
r3614 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
MinRK
Refactor newparallel to use Config system...
r3604
# launch heartbeat
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 hb_addrs = msg.content.heartbeat
MinRK
Refactor newparallel to use Config system...
r3604 # print (hb_addrs)
# # Redirect input streams and set a display hook.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if self.out_stream_factory:
sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
sys.stdout.topic = 'engine.%i.stdout'%self.id
sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
sys.stderr.topic = 'engine.%i.stderr'%self.id
if self.display_hook_factory:
sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
sys.displayhook.topic = 'engine.%i.pyout'%self.id
MinRK
enforce ascii identities in parallel code...
r4160
MinRK
rework logging connections
r3610 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 loop=loop, user_ns = self.user_ns, log=self.log)
MinRK
Refactor newparallel to use Config system...
r3604 self.kernel.start()
MinRK
persist connection data to disk as json
r3614 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
MinRK
organize IPython.parallel into subpackages
r3673 heart = Heart(*map(str, hb_addrs), heart_id=identity)
MinRK
Refactor newparallel to use Config system...
r3604 heart.start()
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
persist connection data to disk as json
r3614 self.log.fatal("Registration Failed: %s"%msg)
MinRK
prep newparallel for rebase...
r3539 raise Exception("Registration Failed: %s"%msg)
MinRK
rework logging connections
r3610 self.log.info("Completed registration with id %i"%self.id)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
prep newparallel for rebase...
r3539
MinRK
persist connection data to disk as json
r3614 def abort(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
MinRK
Refactor newparallel to use Config system...
r3604 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
MinRK
prep newparallel for rebase...
r3539 time.sleep(1)
MinRK
persist connection data to disk as json
r3614 sys.exit(255)
MinRK
prep newparallel for rebase...
r3539
def start(self):
MinRK
Refactor newparallel to use Config system...
r3604 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
dc.start()
MinRK
persist connection data to disk as json
r3614 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
self._abort_dc.start()
MinRK
Refactor newparallel to use Config system...
r3604