##// END OF EJS Templates
Improvements to dependency handling...
Improvements to dependency handling Specifically: * add 'success_only' switch to Dependencies * Scheduler handles some cases where Dependencies are impossible to meet.

File last commit:

r3605:2d79d3e4
r3607:d51586b9
Show More
engine.py
141 lines | 5.0 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
connected to the Controller's queue(s).
"""
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
prep newparallel for rebase...
r3539 import sys
import time
import uuid
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 import logging
MinRK
control channel progress
r3540 from pprint import pprint
MinRK
prep newparallel for rebase...
r3539
import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # internal
from IPython.config.configurable import Configurable
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # from IPython.utils.localinterfaces import LOCALHOST
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
Refactor newparallel to use Config system...
r3604 from factory import RegistrationFactory
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 from streamsession import Message
from streamkernel import Kernel
MinRK
prep newparallel for rebase...
r3539 import heartmonitor
def printer(*msg):
MinRK
Refactor newparallel to use Config system...
r3604 # print (logging.handlers, file=sys.__stdout__)
logging.info(str(msg))
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 class EngineFactory(RegistrationFactory):
MinRK
prep newparallel for rebase...
r3539 """IPython engine"""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # configurables:
MinRK
Refactor newparallel to use Config system...
r3604 user_ns=Dict(config=True)
out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
# not configurable:
id=Int(allow_none=True)
registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
kernel=Instance(Kernel)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
def __init__(self, **kwargs):
MinRK
Refactor newparallel to use Config system...
r3604 super(EngineFactory, self).__init__(**kwargs)
ctx = self.context
reg = ctx.socket(zmq.PAIR)
reg.setsockopt(zmq.IDENTITY, self.ident)
reg.connect(self.url)
self.registrar = zmqstream.ZMQStream(reg, self.loop)
MinRK
prep newparallel for rebase...
r3539
def register(self):
MinRK
Refactor newparallel to use Config system...
r3604 """send the registration_request"""
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 logging.info("registering")
MinRK
control channel progress
r3540 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
MinRK
prep newparallel for rebase...
r3539 self.registrar.on_recv(self.complete_registration)
MinRK
added exec_key and fixed client.shutdown
r3575 # print (self.session.key)
MinRK
prep newparallel for rebase...
r3539 self.session.send(self.registrar, "registration_request",content=content)
def complete_registration(self, msg):
# print msg
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
loop = self.loop
identity = self.ident
print (identity)
MinRK
prep newparallel for rebase...
r3539 idents,msg = self.session.feed_identities(msg)
msg = Message(self.session.unpack_message(msg))
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
prep newparallel for rebase...
r3539 if msg.content.status == 'ok':
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.id = int(msg.content.id)
MinRK
Refactor newparallel to use Config system...
r3604
# create Shell Streams (MUX, Task, etc.):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 queue_addr = msg.content.mux
shell_addrs = [ str(queue_addr) ]
MinRK
prep newparallel for rebase...
r3539 task_addr = msg.content.task
if task_addr:
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 shell_addrs.append(str(task_addr))
MinRK
Refactor newparallel to use Config system...
r3604 shell_streams = []
for addr in shell_addrs:
stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
stream.setsockopt(zmq.IDENTITY, identity)
stream.connect(addr)
shell_streams.append(stream)
# control stream:
control_addr = str(msg.content.control)
control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
control_stream.setsockopt(zmq.IDENTITY, identity)
control_stream.connect(control_addr)
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 # create iopub stream:
iopub_addr = msg.content.iopub
iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
iopub_stream.setsockopt(zmq.IDENTITY, identity)
iopub_stream.connect(iopub_addr)
# launch heartbeat
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 hb_addrs = msg.content.heartbeat
MinRK
Refactor newparallel to use Config system...
r3604 # print (hb_addrs)
# # Redirect input streams and set a display hook.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if self.out_stream_factory:
sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
sys.stdout.topic = 'engine.%i.stdout'%self.id
sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
sys.stderr.topic = 'engine.%i.stderr'%self.id
if self.display_hook_factory:
sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
sys.displayhook.topic = 'engine.%i.pyout'%self.id
MinRK
Refactor newparallel to use Config system...
r3604
self.kernel = Kernel(int_id=self.id, ident=self.ident, session=self.session,
control_stream=control_stream,
shell_streams=shell_streams, iopub_stream=iopub_stream, loop=loop,
user_ns = self.user_ns, config=self.config)
self.kernel.start()
heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
MinRK
Refactor newparallel to use Config system...
r3604 heart.start()
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
Refactor newparallel to use Config system...
r3604 logging.error("Registration Failed: %s"%msg)
MinRK
prep newparallel for rebase...
r3539 raise Exception("Registration Failed: %s"%msg)
MinRK
Refactor newparallel to use Config system...
r3604 logging.info("Completed registration with id %i"%self.id)
MinRK
prep newparallel for rebase...
r3539
def unregister(self):
MinRK
Refactor newparallel to use Config system...
r3604 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
MinRK
prep newparallel for rebase...
r3539 time.sleep(1)
sys.exit(0)
def start(self):
MinRK
Refactor newparallel to use Config system...
r3604 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
dc.start()