##// END OF EJS Templates
split get_results into get_result/result_status, add AsyncHubResult
split get_results into get_result/result_status, add AsyncHubResult

File last commit:

r3631:d72427c6
r3639:7724ff79
Show More
engine.py
147 lines | 5.5 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""A simple engine that talks to a controller over 0MQ.
it handles registration, etc. and launches a kernel
connected to the Controller's queue(s).
"""
MinRK
added simple cluster entry point
r3552 from __future__ import print_function
MinRK
resort imports in a cleaner order
r3631
import logging
MinRK
prep newparallel for rebase...
r3539 import sys
import time
import uuid
MinRK
control channel progress
r3540 from pprint import pprint
MinRK
prep newparallel for rebase...
r3539
import zmq
from zmq.eventloop import ioloop, zmqstream
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # internal
from IPython.config.configurable import Configurable
MinRK
persist connection data to disk as json
r3614 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # from IPython.utils.localinterfaces import LOCALHOST
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
resort imports in a cleaner order
r3631 import heartmonitor
MinRK
Refactor newparallel to use Config system...
r3604 from factory import RegistrationFactory
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 from streamkernel import Kernel
MinRK
resort imports in a cleaner order
r3631 from streamsession import Message
from util import disambiguate_url
MinRK
prep newparallel for rebase...
r3539
def printer(*msg):
MinRK
rework logging connections
r3610 # print (self.log.handlers, file=sys.__stdout__)
self.log.info(str(msg))
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 class EngineFactory(RegistrationFactory):
MinRK
prep newparallel for rebase...
r3539 """IPython engine"""
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # configurables:
MinRK
Refactor newparallel to use Config system...
r3604 user_ns=Dict(config=True)
out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
MinRK
persist connection data to disk as json
r3614 location=Str(config=True)
timeout=CFloat(2,config=True)
MinRK
Refactor newparallel to use Config system...
r3604
# not configurable:
id=Int(allow_none=True)
registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
kernel=Instance(Kernel)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
def __init__(self, **kwargs):
MinRK
Refactor newparallel to use Config system...
r3604 super(EngineFactory, self).__init__(**kwargs)
ctx = self.context
reg = ctx.socket(zmq.PAIR)
reg.setsockopt(zmq.IDENTITY, self.ident)
reg.connect(self.url)
self.registrar = zmqstream.ZMQStream(reg, self.loop)
MinRK
prep newparallel for rebase...
r3539
def register(self):
MinRK
Refactor newparallel to use Config system...
r3604 """send the registration_request"""
MinRK
prep newparallel for rebase...
r3539
MinRK
rework logging connections
r3610 self.log.info("registering")
MinRK
control channel progress
r3540 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
MinRK
prep newparallel for rebase...
r3539 self.registrar.on_recv(self.complete_registration)
MinRK
added exec_key and fixed client.shutdown
r3575 # print (self.session.key)
MinRK
prep newparallel for rebase...
r3539 self.session.send(self.registrar, "registration_request",content=content)
def complete_registration(self, msg):
# print msg
MinRK
persist connection data to disk as json
r3614 self._abort_dc.stop()
MinRK
Refactor newparallel to use Config system...
r3604 ctx = self.context
loop = self.loop
identity = self.ident
print (identity)
MinRK
prep newparallel for rebase...
r3539 idents,msg = self.session.feed_identities(msg)
msg = Message(self.session.unpack_message(msg))
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
prep newparallel for rebase...
r3539 if msg.content.status == 'ok':
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.id = int(msg.content.id)
MinRK
Refactor newparallel to use Config system...
r3604
# create Shell Streams (MUX, Task, etc.):
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 queue_addr = msg.content.mux
shell_addrs = [ str(queue_addr) ]
MinRK
prep newparallel for rebase...
r3539 task_addr = msg.content.task
if task_addr:
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 shell_addrs.append(str(task_addr))
MinRK
Refactor newparallel to use Config system...
r3604 shell_streams = []
for addr in shell_addrs:
stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
stream.setsockopt(zmq.IDENTITY, identity)
MinRK
persist connection data to disk as json
r3614 stream.connect(disambiguate_url(addr, self.location))
MinRK
Refactor newparallel to use Config system...
r3604 shell_streams.append(stream)
# control stream:
control_addr = str(msg.content.control)
control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
control_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
persist connection data to disk as json
r3614 control_stream.connect(disambiguate_url(control_addr, self.location))
MinRK
prep newparallel for rebase...
r3539
MinRK
Refactor newparallel to use Config system...
r3604 # create iopub stream:
iopub_addr = msg.content.iopub
iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
iopub_stream.setsockopt(zmq.IDENTITY, identity)
MinRK
persist connection data to disk as json
r3614 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
MinRK
Refactor newparallel to use Config system...
r3604
# launch heartbeat
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569 hb_addrs = msg.content.heartbeat
MinRK
Refactor newparallel to use Config system...
r3604 # print (hb_addrs)
# # Redirect input streams and set a display hook.
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 if self.out_stream_factory:
sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
sys.stdout.topic = 'engine.%i.stdout'%self.id
sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
sys.stderr.topic = 'engine.%i.stderr'%self.id
if self.display_hook_factory:
sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
sys.displayhook.topic = 'engine.%i.pyout'%self.id
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
rework logging connections
r3610 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
loop=loop, user_ns = self.user_ns, logname=self.log.name)
MinRK
Refactor newparallel to use Config system...
r3604 self.kernel.start()
MinRK
persist connection data to disk as json
r3614 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
MinRK
Refactor newparallel to use Config system...
r3604 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
MinRK
adapt kernel's ipcluster and Launchers to newparallel
r3605 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
MinRK
Refactor newparallel to use Config system...
r3604 heart.start()
MinRK
Parallel kernel/engine startup looks a bit more like pykernel
r3569
MinRK
prep newparallel for rebase...
r3539 else:
MinRK
persist connection data to disk as json
r3614 self.log.fatal("Registration Failed: %s"%msg)
MinRK
prep newparallel for rebase...
r3539 raise Exception("Registration Failed: %s"%msg)
MinRK
rework logging connections
r3610 self.log.info("Completed registration with id %i"%self.id)
MinRK
Refactor newparallel to use Config system...
r3604
MinRK
prep newparallel for rebase...
r3539
MinRK
persist connection data to disk as json
r3614 def abort(self):
self.log.fatal("Registration timed out")
MinRK
Refactor newparallel to use Config system...
r3604 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
MinRK
prep newparallel for rebase...
r3539 time.sleep(1)
MinRK
persist connection data to disk as json
r3614 sys.exit(255)
MinRK
prep newparallel for rebase...
r3539
def start(self):
MinRK
Refactor newparallel to use Config system...
r3604 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
dc.start()
MinRK
persist connection data to disk as json
r3614 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
self._abort_dc.start()
MinRK
Refactor newparallel to use Config system...
r3604