##// END OF EJS Templates
use Popen instead of getstatusoutput to check for libedit....
use Popen instead of getstatusoutput to check for libedit. getstatusoutput uses os.popen, and is vulnerable to EINTR weirdness in environments such as gdb or PyQt. Exponential falloff is also used, to prevent waiting forever or firing requests too fast, though I haven't had it fire more than once after moving to Popen. closes gh-473

File last commit:

r3673:b9f54806
r3906:cf26ce8a
Show More
engine.py
156 lines | 6.2 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
MinRK
cleanup pass
r3644 connected to the Controller's Schedulers.
MinRK
prep newparallel for rebase...
r3539 """
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
resort imports in a cleaner order
r3631
MinRK
prep newparallel for rebase...
r3539 import sys
import time
import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # internal
MinRK
persist connection data to disk as json
r3614 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # from IPython.utils.localinterfaces import LOCALHOST
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel.controller.heartmonitor import Heart
from IPython.parallel.factory import RegistrationFactory
from IPython.parallel.streamsession import Message
from IPython.parallel.util import disambiguate_url
MinRK
eliminate relative imports
r3642 from .streamkernel import Kernel
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 class EngineFactory(RegistrationFactory):
MinRK
prep newparallel for rebase...
r3539 """IPython engine"""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # configurables:
MinRK
Refactor newparallel to use Config system...
r3604 user_ns=Dict(config=True)
out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
MinRK
persist connection data to disk as json
r3614 location=Str(config=True)
timeout=CFloat(2,config=True)
MinRK
Refactor newparallel to use Config system...
r3604
# not configurable:
id=Int(allow_none=True)
registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
kernel=Instance(Kernel)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
def __init__(self, **kwargs):
MinRK
Refactor newparallel to use Config system...
r3604 super(EngineFactory, self).__init__(**kwargs)
ctx = self.context
MinRK
remove all PAIR sockets, Merge registration+query
r3657 reg = ctx.socket(zmq.XREQ)
MinRK
Refactor newparallel to use Config system...
r3604 reg.setsockopt(zmq.IDENTITY, self.ident)
reg.connect(self.url)
self.registrar = zmqstream.ZMQStream(reg, self.loop)
MinRK
prep newparallel for rebase...
r3539
def register(self):
MinRK
Refactor newparallel to use Config system...
r3604 """send the registration_request"""
MinRK
prep newparallel for rebase...
r3539
MinRK
rework logging connections
r3610 self.log.info("registering")
MinRK
control channel progress
r3540 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
MinRK
prep newparallel for rebase...
r3539 self.registrar.on_recv(self.complete_registration)
MinRK
added exec_key and fixed client.shutdown
r3575 # print (self.session.key)
MinRK
prep newparallel for rebase...
r3539 self.session.send(self.registrar, "registration_request",content=content)
def complete_registration(self, msg):
# print msg
MinRK
persist connection data to disk as json
r3614 self._abort_dc.stop()
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
loop = self.loop
identity = self.ident
MinRK
prep newparallel for rebase...
r3539 idents,msg = self.session.feed_identities(msg)
msg = Message(self.session.unpack_message(msg))
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
prep newparallel for rebase...
r3539 if msg.content.status == 'ok':
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.id = int(msg.content.id)
MinRK
Refactor newparallel to use Config system...
r3604
# create Shell Streams (MUX, Task, etc.):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 queue_addr = msg.content.mux
shell_addrs = [ str(queue_addr) ]
MinRK
prep newparallel for rebase...
r3539 task_addr = msg.content.task
if task_addr:
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 shell_addrs.append(str(task_addr))
MinRK
remove all PAIR sockets, Merge registration+query
r3657
# Uncomment this to go back to two-socket model
# shell_streams = []
# for addr in shell_addrs:
# stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
# stream.setsockopt(zmq.IDENTITY, identity)
# stream.connect(disambiguate_url(addr, self.location))
# shell_streams.append(stream)
# Now use only one shell stream for mux and tasks
stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
stream.setsockopt(zmq.IDENTITY, identity)
shell_streams = [stream]
MinRK
Refactor newparallel to use Config system...
r3604 for addr in shell_addrs:
MinRK
persist connection data to disk as json
r3614 stream.connect(disambiguate_url(addr, self.location))
MinRK
remove all PAIR sockets, Merge registration+query
r3657 # end single stream-socket
MinRK
Refactor newparallel to use Config system...
r3604
# control stream:
control_addr = str(msg.content.control)
MinRK
remove all PAIR sockets, Merge registration+query
r3657 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
MinRK
Refactor newparallel to use Config system...
r3604 control_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
persist connection data to disk as json
r3614 control_stream.connect(disambiguate_url(control_addr, self.location))
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 # create iopub stream:
iopub_addr = msg.content.iopub
iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
iopub_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
persist connection data to disk as json
r3614 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
MinRK
Refactor newparallel to use Config system...
r3604
# launch heartbeat
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 hb_addrs = msg.content.heartbeat
MinRK
Refactor newparallel to use Config system...
r3604 # print (hb_addrs)
# # Redirect input streams and set a display hook.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if self.out_stream_factory:
sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
sys.stdout.topic = 'engine.%i.stdout'%self.id
sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
sys.stderr.topic = 'engine.%i.stderr'%self.id
if self.display_hook_factory:
sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
sys.displayhook.topic = 'engine.%i.pyout'%self.id
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rework logging connections
r3610 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
loop=loop, user_ns = self.user_ns, logname=self.log.name)
MinRK
Refactor newparallel to use Config system...
r3604 self.kernel.start()
MinRK
persist connection data to disk as json
r3614 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
MinRK
organize IPython.parallel into subpackages
r3673 heart = Heart(*map(str, hb_addrs), heart_id=identity)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
MinRK
Refactor newparallel to use Config system...
r3604 heart.start()
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
persist connection data to disk as json
r3614 self.log.fatal("Registration Failed: %s"%msg)
MinRK
prep newparallel for rebase...
r3539 raise Exception("Registration Failed: %s"%msg)
MinRK
rework logging connections
r3610 self.log.info("Completed registration with id %i"%self.id)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
prep newparallel for rebase...
r3539
MinRK
persist connection data to disk as json
r3614 def abort(self):
self.log.fatal("Registration timed out")
MinRK
Refactor newparallel to use Config system...
r3604 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
MinRK
prep newparallel for rebase...
r3539 time.sleep(1)
MinRK
persist connection data to disk as json
r3614 sys.exit(255)
MinRK
prep newparallel for rebase...
r3539
def start(self):
MinRK
Refactor newparallel to use Config system...
r3604 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
dc.start()
MinRK
persist connection data to disk as json
r3614 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
self._abort_dc.start()
MinRK
Refactor newparallel to use Config system...
r3604