engine.py
147 lines
| 5.5 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
"""A simple engine that talks to a controller over 0MQ. | ||||
it handles registration, etc. and launches a kernel | ||||
connected to the Controller's queue(s). | ||||
""" | ||||
MinRK
|
r3552 | from __future__ import print_function | ||
MinRK
|
r3631 | |||
import logging | ||||
MinRK
|
r3539 | import sys | ||
import time | ||||
import uuid | ||||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3539 | |||
import zmq | ||||
from zmq.eventloop import ioloop, zmqstream | ||||
MinRK
|
r3603 | # internal | ||
from IPython.config.configurable import Configurable | ||||
MinRK
|
r3614 | from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat | ||
MinRK
|
r3603 | # from IPython.utils.localinterfaces import LOCALHOST | ||
MinRK
|
r3569 | |||
MinRK
|
r3631 | import heartmonitor | ||
MinRK
|
r3604 | from factory import RegistrationFactory | ||
MinRK
|
r3605 | from streamkernel import Kernel | ||
MinRK
|
r3631 | from streamsession import Message | ||
from util import disambiguate_url | ||||
MinRK
|
r3539 | |||
def printer(*msg): | ||||
MinRK
|
r3610 | # print (self.log.handlers, file=sys.__stdout__) | ||
self.log.info(str(msg)) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3604 | class EngineFactory(RegistrationFactory): | ||
MinRK
|
r3539 | """IPython engine""" | ||
MinRK
|
r3603 | # configurables: | ||
MinRK
|
r3604 | user_ns=Dict(config=True) | ||
out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True) | ||||
display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True) | ||||
MinRK
|
r3614 | location=Str(config=True) | ||
timeout=CFloat(2,config=True) | ||||
MinRK
|
r3604 | |||
# not configurable: | ||||
id=Int(allow_none=True) | ||||
registrar=Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
kernel=Instance(Kernel) | ||||
MinRK
|
r3603 | |||
def __init__(self, **kwargs): | ||||
MinRK
|
r3604 | super(EngineFactory, self).__init__(**kwargs) | ||
ctx = self.context | ||||
reg = ctx.socket(zmq.PAIR) | ||||
reg.setsockopt(zmq.IDENTITY, self.ident) | ||||
reg.connect(self.url) | ||||
self.registrar = zmqstream.ZMQStream(reg, self.loop) | ||||
MinRK
|
r3539 | |||
def register(self): | ||||
MinRK
|
r3604 | """send the registration_request""" | ||
MinRK
|
r3539 | |||
MinRK
|
r3610 | self.log.info("registering") | ||
MinRK
|
r3540 | content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident) | ||
MinRK
|
r3539 | self.registrar.on_recv(self.complete_registration) | ||
MinRK
|
r3575 | # print (self.session.key) | ||
MinRK
|
r3539 | self.session.send(self.registrar, "registration_request",content=content) | ||
def complete_registration(self, msg): | ||||
# print msg | ||||
MinRK
|
r3614 | self._abort_dc.stop() | ||
MinRK
|
r3604 | ctx = self.context | ||
loop = self.loop | ||||
identity = self.ident | ||||
print (identity) | ||||
MinRK
|
r3539 | idents,msg = self.session.feed_identities(msg) | ||
msg = Message(self.session.unpack_message(msg)) | ||||
MinRK
|
r3604 | |||
MinRK
|
r3539 | if msg.content.status == 'ok': | ||
MinRK
|
r3603 | self.id = int(msg.content.id) | ||
MinRK
|
r3604 | |||
# create Shell Streams (MUX, Task, etc.): | ||||
MinRK
|
r3603 | queue_addr = msg.content.mux | ||
shell_addrs = [ str(queue_addr) ] | ||||
MinRK
|
r3539 | task_addr = msg.content.task | ||
if task_addr: | ||||
MinRK
|
r3569 | shell_addrs.append(str(task_addr)) | ||
MinRK
|
r3604 | shell_streams = [] | ||
for addr in shell_addrs: | ||||
stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop) | ||||
stream.setsockopt(zmq.IDENTITY, identity) | ||||
MinRK
|
r3614 | stream.connect(disambiguate_url(addr, self.location)) | ||
MinRK
|
r3604 | shell_streams.append(stream) | ||
# control stream: | ||||
control_addr = str(msg.content.control) | ||||
control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop) | ||||
control_stream.setsockopt(zmq.IDENTITY, identity) | ||||
MinRK
|
r3614 | control_stream.connect(disambiguate_url(control_addr, self.location)) | ||
MinRK
|
r3539 | |||
MinRK
|
r3604 | # create iopub stream: | ||
iopub_addr = msg.content.iopub | ||||
iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop) | ||||
iopub_stream.setsockopt(zmq.IDENTITY, identity) | ||||
MinRK
|
r3614 | iopub_stream.connect(disambiguate_url(iopub_addr, self.location)) | ||
MinRK
|
r3604 | |||
# launch heartbeat | ||||
MinRK
|
r3569 | hb_addrs = msg.content.heartbeat | ||
MinRK
|
r3604 | # print (hb_addrs) | ||
# # Redirect input streams and set a display hook. | ||||
MinRK
|
r3605 | if self.out_stream_factory: | ||
sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout') | ||||
sys.stdout.topic = 'engine.%i.stdout'%self.id | ||||
sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr') | ||||
sys.stderr.topic = 'engine.%i.stderr'%self.id | ||||
if self.display_hook_factory: | ||||
sys.displayhook = self.display_hook_factory(self.session, iopub_stream) | ||||
sys.displayhook.topic = 'engine.%i.pyout'%self.id | ||||
MinRK
|
r3604 | |||
MinRK
|
r3610 | self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, | ||
control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, | ||||
loop=loop, user_ns = self.user_ns, logname=self.log.name) | ||||
MinRK
|
r3604 | self.kernel.start() | ||
MinRK
|
r3614 | hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ] | ||
MinRK
|
r3604 | heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) | ||
MinRK
|
r3605 | # ioloop.DelayedCallback(heart.start, 1000, self.loop).start() | ||
MinRK
|
r3604 | heart.start() | ||
MinRK
|
r3569 | |||
MinRK
|
r3539 | else: | ||
MinRK
|
r3614 | self.log.fatal("Registration Failed: %s"%msg) | ||
MinRK
|
r3539 | raise Exception("Registration Failed: %s"%msg) | ||
MinRK
|
r3610 | self.log.info("Completed registration with id %i"%self.id) | ||
MinRK
|
r3604 | |||
MinRK
|
r3539 | |||
MinRK
|
r3614 | def abort(self): | ||
self.log.fatal("Registration timed out") | ||||
MinRK
|
r3604 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) | ||
MinRK
|
r3539 | time.sleep(1) | ||
MinRK
|
r3614 | sys.exit(255) | ||
MinRK
|
r3539 | |||
def start(self): | ||||
MinRK
|
r3604 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) | ||
dc.start() | ||||
MinRK
|
r3614 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) | ||
self._abort_dc.start() | ||||
MinRK
|
r3604 | |||