##// END OF EJS Templates
update parallel apps to use ProfileDir
update parallel apps to use ProfileDir

File last commit:

r3990:080d3c71
r3992:54eceb8d
Show More
engine.py
165 lines | 6.7 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
MinRK
cleanup pass
r3644 connected to the Controller's Schedulers.
MinRK
prep newparallel for rebase...
r3539 """
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
resort imports in a cleaner order
r3631
MinRK
prep newparallel for rebase...
r3539 import sys
import time
import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # internal
MinRK
cleanup parallel traits...
r3988 from IPython.utils.traitlets import Instance, Dict, Int, Type, CFloat, Unicode
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # from IPython.utils.localinterfaces import LOCALHOST
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.controller.heartmonitor import Heart
from IPython.parallel.factory import RegistrationFactory
from IPython.parallel.streamsession import Message
from IPython.parallel.util import disambiguate_url
MinRK
eliminate relative imports
r3642 from .streamkernel import Kernel
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 class EngineFactory(RegistrationFactory):
MinRK
prep newparallel for rebase...
r3539 """IPython engine"""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # configurables:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
help="""The OutStream for handling stdout/err.
Typically 'IPython.zmq.iostream.OutStream'""")
display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True,
help="""The class for handling displayhook.
Typically 'IPython.zmq.displayhook.DisplayHook'""")
MinRK
cleanup parallel traits...
r3988 location=Unicode(config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help="""The location (an IP address) of the controller. This is
used for disambiguating URLs, to determine whether
loopback should be used to connect or the public address.""")
timeout=CFloat(2,config=True,
help="""The time (in seconds) to wait for the Controller to respond
to registration requests before giving up.""")
MinRK
Refactor newparallel to use Config system...
r3604
# not configurable:
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 user_ns=Dict()
MinRK
Refactor newparallel to use Config system...
r3604 id=Int(allow_none=True)
registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
kernel=Instance(Kernel)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
def __init__(self, **kwargs):
MinRK
Refactor newparallel to use Config system...
r3604 super(EngineFactory, self).__init__(**kwargs)
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.ident = self.session.session
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
MinRK
remove all PAIR sockets, Merge registration+query
r3657 reg = ctx.socket(zmq.XREQ)
MinRK
Refactor newparallel to use Config system...
r3604 reg.setsockopt(zmq.IDENTITY, self.ident)
reg.connect(self.url)
self.registrar = zmqstream.ZMQStream(reg, self.loop)
MinRK
prep newparallel for rebase...
r3539
def register(self):
MinRK
Refactor newparallel to use Config system...
r3604 """send the registration_request"""
MinRK
prep newparallel for rebase...
r3539
MinRK
rework logging connections
r3610 self.log.info("registering")
MinRK
control channel progress
r3540 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
MinRK
prep newparallel for rebase...
r3539 self.registrar.on_recv(self.complete_registration)
MinRK
added exec_key and fixed client.shutdown
r3575 # print (self.session.key)
MinRK
prep newparallel for rebase...
r3539 self.session.send(self.registrar, "registration_request",content=content)
def complete_registration(self, msg):
# print msg
MinRK
persist connection data to disk as json
r3614 self._abort_dc.stop()
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
loop = self.loop
identity = self.ident
MinRK
prep newparallel for rebase...
r3539 idents,msg = self.session.feed_identities(msg)
msg = Message(self.session.unpack_message(msg))
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
prep newparallel for rebase...
r3539 if msg.content.status == 'ok':
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.id = int(msg.content.id)
MinRK
Refactor newparallel to use Config system...
r3604
# create Shell Streams (MUX, Task, etc.):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 queue_addr = msg.content.mux
shell_addrs = [ str(queue_addr) ]
MinRK
prep newparallel for rebase...
r3539 task_addr = msg.content.task
if task_addr:
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 shell_addrs.append(str(task_addr))
MinRK
remove all PAIR sockets, Merge registration+query
r3657
# Uncomment this to go back to two-socket model
# shell_streams = []
# for addr in shell_addrs:
# stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
# stream.setsockopt(zmq.IDENTITY, identity)
# stream.connect(disambiguate_url(addr, self.location))
# shell_streams.append(stream)
# Now use only one shell stream for mux and tasks
stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
stream.setsockopt(zmq.IDENTITY, identity)
shell_streams = [stream]
MinRK
Refactor newparallel to use Config system...
r3604 for addr in shell_addrs:
MinRK
persist connection data to disk as json
r3614 stream.connect(disambiguate_url(addr, self.location))
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # end single stream-socket
MinRK
Refactor newparallel to use Config system...
r3604
# control stream:
control_addr = str(msg.content.control)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
MinRK
Refactor newparallel to use Config system...
r3604 control_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
persist connection data to disk as json
r3614 control_stream.connect(disambiguate_url(control_addr, self.location))
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 # create iopub stream:
iopub_addr = msg.content.iopub
iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
iopub_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
persist connection data to disk as json
r3614 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
MinRK
Refactor newparallel to use Config system...
r3604
# launch heartbeat
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 hb_addrs = msg.content.heartbeat
MinRK
Refactor newparallel to use Config system...
r3604 # print (hb_addrs)
# # Redirect input streams and set a display hook.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if self.out_stream_factory:
sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
sys.stdout.topic = 'engine.%i.stdout'%self.id
sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
sys.stderr.topic = 'engine.%i.stderr'%self.id
if self.display_hook_factory:
sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
sys.displayhook.topic = 'engine.%i.pyout'%self.id
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rework logging connections
r3610 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 loop=loop, user_ns = self.user_ns, log=self.log)
MinRK
Refactor newparallel to use Config system...
r3604 self.kernel.start()
MinRK
persist connection data to disk as json
r3614 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
MinRK
organize IPython.parallel into subpackages
r3673 heart = Heart(*map(str, hb_addrs), heart_id=identity)
MinRK
Refactor newparallel to use Config system...
r3604 heart.start()
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
persist connection data to disk as json
r3614 self.log.fatal("Registration Failed: %s"%msg)
MinRK
prep newparallel for rebase...
r3539 raise Exception("Registration Failed: %s"%msg)
MinRK
rework logging connections
r3610 self.log.info("Completed registration with id %i"%self.id)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
prep newparallel for rebase...
r3539
MinRK
persist connection data to disk as json
r3614 def abort(self):
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
MinRK
Refactor newparallel to use Config system...
r3604 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
MinRK
prep newparallel for rebase...
r3539 time.sleep(1)
MinRK
persist connection data to disk as json
r3614 sys.exit(255)
MinRK
prep newparallel for rebase...
r3539
def start(self):
MinRK
Refactor newparallel to use Config system...
r3604 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
dc.start()
MinRK
persist connection data to disk as json
r3614 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
self._abort_dc.start()
MinRK
Refactor newparallel to use Config system...
r3604