streamkernel.py
435 lines
| 16.4 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
""" | ||||
Kernel adapted from kernel.py to use ZMQ Streams | ||||
""" | ||||
MinRK
|
r3569 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
# Standard library imports. | ||||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3539 | import __builtin__ | ||
MinRK
|
r3569 | from code import CommandCompiler | ||
MinRK
|
r3540 | import os | ||
MinRK
|
r3539 | import sys | ||
import time | ||||
import traceback | ||||
MinRK
|
r3578 | from datetime import datetime | ||
MinRK
|
r3539 | from signal import SIGTERM, SIGKILL | ||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3539 | |||
MinRK
|
r3569 | # System library imports. | ||
MinRK
|
r3539 | import zmq | ||
from zmq.eventloop import ioloop, zmqstream | ||||
MinRK
|
r3569 | # Local imports. | ||
MinRK
|
r3583 | from IPython.core import ultratb | ||
MinRK
|
r3569 | from IPython.utils.traitlets import HasTraits, Instance, List | ||
MinRK
|
r3546 | from IPython.zmq.completer import KernelCompleter | ||
MinRK
|
r3578 | from IPython.zmq.log import logger # a Logger object | ||
MinRK
|
r3546 | |||
MinRK
|
r3539 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | ||
MinRK
|
r3580 | unpack_apply_message, ISO8601, wrap_exception | ||
MinRK
|
r3546 | from dependency import UnmetDependency | ||
MinRK
|
r3569 | import heartmonitor | ||
from client import Client | ||||
MinRK
|
r3539 | |||
MinRK
|
r3540 | def printer(*args): | ||
pprint(args) | ||||
MinRK
|
r3569 | #----------------------------------------------------------------------------- | ||
# Main kernel class | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3569 | class Kernel(HasTraits): | ||
MinRK
|
r3539 | |||
MinRK
|
r3569 | #--------------------------------------------------------------------------- | ||
# Kernel interface | ||||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3569 | session = Instance(StreamSession) | ||
shell_streams = Instance(list) | ||||
control_stream = Instance(zmqstream.ZMQStream) | ||||
task_stream = Instance(zmqstream.ZMQStream) | ||||
iopub_stream = Instance(zmqstream.ZMQStream) | ||||
client = Instance(Client) | ||||
MinRK
|
r3580 | loop = Instance(ioloop.IOLoop) | ||
MinRK
|
r3569 | |||
def __init__(self, **kwargs): | ||||
super(Kernel, self).__init__(**kwargs) | ||||
self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) | ||||
MinRK
|
r3539 | self.user_ns = {} | ||
self.history = [] | ||||
self.compiler = CommandCompiler() | ||||
self.completer = KernelCompleter(self.user_ns) | ||||
self.aborted = set() | ||||
# Build dict of handlers for message types | ||||
MinRK
|
r3569 | self.shell_handlers = {} | ||
MinRK
|
r3539 | self.control_handlers = {} | ||
MinRK
|
r3569 | for msg_type in ['execute_request', 'complete_request', 'apply_request', | ||
'clear_request']: | ||||
self.shell_handlers[msg_type] = getattr(self, msg_type) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3569 | for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): | ||
MinRK
|
r3539 | self.control_handlers[msg_type] = getattr(self, msg_type) | ||
MinRK
|
r3583 | |||
def _wrap_exception(self, method=None): | ||||
e_info = dict(engineid=self.identity, method=method) | ||||
content=wrap_exception(e_info) | ||||
return content | ||||
MinRK
|
r3539 | #-------------------- control handlers ----------------------------- | ||
MinRK
|
r3540 | def abort_queues(self): | ||
MinRK
|
r3569 | for stream in self.shell_streams: | ||
MinRK
|
r3540 | if stream: | ||
self.abort_queue(stream) | ||||
MinRK
|
r3539 | |||
def abort_queue(self, stream): | ||||
while True: | ||||
try: | ||||
msg = self.session.recv(stream, zmq.NOBLOCK,content=True) | ||||
MinRK
|
r3556 | except zmq.ZMQError as e: | ||
MinRK
|
r3539 | if e.errno == zmq.EAGAIN: | ||
break | ||||
else: | ||||
return | ||||
else: | ||||
if msg is None: | ||||
return | ||||
else: | ||||
idents,msg = msg | ||||
# assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." | ||||
# msg = self.reply_socket.recv_json() | ||||
MinRK
|
r3553 | print ("Aborting:", file=sys.__stdout__) | ||
print (Message(msg), file=sys.__stdout__) | ||||
MinRK
|
r3539 | msg_type = msg['msg_type'] | ||
reply_type = msg_type.split('_')[0] + '_reply' | ||||
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | ||||
# self.reply_socket.send(ident,zmq.SNDMORE) | ||||
# self.reply_socket.send_json(reply_msg) | ||||
reply_msg = self.session.send(stream, reply_type, | ||||
MinRK
|
r3550 | content={'status' : 'aborted'}, parent=msg, ident=idents)[0] | ||
MinRK
|
r3553 | print(Message(reply_msg), file=sys.__stdout__) | ||
MinRK
|
r3539 | # We need to wait a bit for requests to come in. This can probably | ||
# be set shorter for true asynchronous clients. | ||||
time.sleep(0.05) | ||||
def abort_request(self, stream, ident, parent): | ||||
MinRK
|
r3540 | """abort a specifig msg by id""" | ||
MinRK
|
r3539 | msg_ids = parent['content'].get('msg_ids', None) | ||
MinRK
|
r3540 | if isinstance(msg_ids, basestring): | ||
msg_ids = [msg_ids] | ||||
MinRK
|
r3539 | if not msg_ids: | ||
MinRK
|
r3540 | self.abort_queues() | ||
MinRK
|
r3539 | for mid in msg_ids: | ||
MinRK
|
r3540 | self.aborted.add(str(mid)) | ||
MinRK
|
r3539 | |||
content = dict(status='ok') | ||||
MinRK
|
r3550 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | ||
parent=parent, ident=ident)[0] | ||||
MinRK
|
r3553 | print(Message(reply_msg), file=sys.__stdout__) | ||
MinRK
|
r3539 | |||
MinRK
|
r3569 | def shutdown_request(self, stream, ident, parent): | ||
MinRK
|
r3560 | """kill ourself. This should really be handled in an external process""" | ||
MinRK
|
r3580 | try: | ||
self.abort_queues() | ||||
except: | ||||
MinRK
|
r3583 | content = self._wrap_exception('shutdown') | ||
MinRK
|
r3580 | else: | ||
content = dict(parent['content']) | ||||
content['status'] = 'ok' | ||||
MinRK
|
r3575 | msg = self.session.send(stream, 'shutdown_reply', | ||
content=content, parent=parent, ident=ident) | ||||
# msg = self.session.send(self.pub_socket, 'shutdown_reply', | ||||
# content, parent, ident) | ||||
MinRK
|
r3569 | # print >> sys.__stdout__, msg | ||
MinRK
|
r3580 | # time.sleep(0.2) | ||
dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3560 | |||
MinRK
|
r3539 | def dispatch_control(self, msg): | ||
idents,msg = self.session.feed_identities(msg, copy=False) | ||||
MinRK
|
r3575 | try: | ||
msg = self.session.unpack_message(msg, content=True, copy=False) | ||||
except: | ||||
logger.error("Invalid Message", exc_info=True) | ||||
return | ||||
MinRK
|
r3539 | |||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
handler = self.control_handlers.get(msg['msg_type'], None) | ||||
if handler is None: | ||||
MinRK
|
r3553 | print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__) | ||
MinRK
|
r3539 | else: | ||
MinRK
|
r3540 | handler(self.control_stream, idents, msg) | ||
MinRK
|
r3539 | |||
#-------------------- queue helpers ------------------------------ | ||||
def check_dependencies(self, dependencies): | ||||
if not dependencies: | ||||
return True | ||||
if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): | ||||
anyorall = dependencies[0] | ||||
dependencies = dependencies[1] | ||||
else: | ||||
anyorall = 'all' | ||||
results = self.client.get_results(dependencies,status_only=True) | ||||
if results['status'] != 'ok': | ||||
return False | ||||
if anyorall == 'any': | ||||
if not results['completed']: | ||||
return False | ||||
else: | ||||
if results['pending']: | ||||
return False | ||||
return True | ||||
MinRK
|
r3540 | def check_aborted(self, msg_id): | ||
return msg_id in self.aborted | ||||
MinRK
|
r3539 | #-------------------- queue handlers ----------------------------- | ||
MinRK
|
r3569 | def clear_request(self, stream, idents, parent): | ||
"""Clear our namespace.""" | ||||
self.user_ns = {} | ||||
msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | ||||
content = dict(status='ok')) | ||||
MinRK
|
r3539 | def execute_request(self, stream, ident, parent): | ||
try: | ||||
code = parent[u'content'][u'code'] | ||||
except: | ||||
MinRK
|
r3553 | print("Got bad msg: ", file=sys.__stderr__) | ||
print(Message(parent), file=sys.__stderr__) | ||||
MinRK
|
r3539 | return | ||
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | ||||
MinRK
|
r3569 | # self.iopub_stream.send(pyin_msg) | ||
self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | ||||
MinRK
|
r3578 | started = datetime.now().strftime(ISO8601) | ||
MinRK
|
r3539 | try: | ||
comp_code = self.compiler(code, '<zmq-kernel>') | ||||
# allow for not overriding displayhook | ||||
if hasattr(sys.displayhook, 'set_parent'): | ||||
sys.displayhook.set_parent(parent) | ||||
exec comp_code in self.user_ns, self.user_ns | ||||
except: | ||||
MinRK
|
r3583 | exc_content = self._wrap_exception('execute') | ||
MinRK
|
r3539 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | ||
MinRK
|
r3569 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | ||
MinRK
|
r3539 | reply_content = exc_content | ||
else: | ||||
reply_content = {'status' : 'ok'} | ||||
# reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | ||||
# self.reply_socket.send(ident, zmq.SNDMORE) | ||||
# self.reply_socket.send_json(reply_msg) | ||||
MinRK
|
r3578 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, | ||
ident=ident, subheader = dict(started=started)) | ||||
MinRK
|
r3553 | print(Message(reply_msg), file=sys.__stdout__) | ||
MinRK
|
r3539 | if reply_msg['content']['status'] == u'error': | ||
MinRK
|
r3540 | self.abort_queues() | ||
MinRK
|
r3539 | |||
def complete_request(self, stream, ident, parent): | ||||
matches = {'matches' : self.complete(parent), | ||||
'status' : 'ok'} | ||||
completion_msg = self.session.send(stream, 'complete_reply', | ||||
matches, parent, ident) | ||||
# print >> sys.__stdout__, completion_msg | ||||
def complete(self, msg): | ||||
return self.completer.complete(msg.content.line, msg.content.text) | ||||
def apply_request(self, stream, ident, parent): | ||||
MinRK
|
r3553 | print (parent) | ||
MinRK
|
r3539 | try: | ||
content = parent[u'content'] | ||||
bufs = parent[u'buffers'] | ||||
msg_id = parent['header']['msg_id'] | ||||
bound = content.get('bound', False) | ||||
except: | ||||
MinRK
|
r3553 | print("Got bad msg: ", file=sys.__stderr__) | ||
print(Message(parent), file=sys.__stderr__) | ||||
MinRK
|
r3539 | return | ||
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | ||||
MinRK
|
r3569 | # self.iopub_stream.send(pyin_msg) | ||
# self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | ||||
MinRK
|
r3578 | sub = {'dependencies_met' : True, 'engine' : self.identity, | ||
'started': datetime.now().strftime(ISO8601)} | ||||
MinRK
|
r3539 | try: | ||
# allow for not overriding displayhook | ||||
if hasattr(sys.displayhook, 'set_parent'): | ||||
sys.displayhook.set_parent(parent) | ||||
# exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | ||||
if bound: | ||||
working = self.user_ns | ||||
suffix = str(msg_id).replace("-","") | ||||
prefix = "_" | ||||
else: | ||||
working = dict() | ||||
MinRK
|
r3540 | suffix = prefix = "_" # prevent keyword collisions with lambda | ||
MinRK
|
r3539 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | ||
# if f.fun | ||||
MinRK
|
r3588 | if hasattr(f, 'func_name'): | ||
fname = f.func_name | ||||
else: | ||||
fname = f.__name__ | ||||
fname = prefix+fname.strip('<>')+suffix | ||||
MinRK
|
r3539 | argname = prefix+"args"+suffix | ||
kwargname = prefix+"kwargs"+suffix | ||||
resultname = prefix+"result"+suffix | ||||
ns = { fname : f, argname : args, kwargname : kwargs } | ||||
# print ns | ||||
working.update(ns) | ||||
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | ||||
exec code in working, working | ||||
result = working.get(resultname) | ||||
# clear the namespace | ||||
if bound: | ||||
for key in ns.iterkeys(): | ||||
self.user_ns.pop(key) | ||||
else: | ||||
del working | ||||
packed_result,buf = serialize_object(result) | ||||
result_buf = [packed_result]+buf | ||||
except: | ||||
MinRK
|
r3583 | exc_content = self._wrap_exception('apply') | ||
MinRK
|
r3539 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | ||
MinRK
|
r3569 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) | ||
MinRK
|
r3539 | reply_content = exc_content | ||
result_buf = [] | ||||
MinRK
|
r3546 | |||
MinRK
|
r3583 | if exc_content['ename'] == UnmetDependency.__name__: | ||
MinRK
|
r3560 | sub['dependencies_met'] = False | ||
MinRK
|
r3539 | else: | ||
reply_content = {'status' : 'ok'} | ||||
# reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | ||||
# self.reply_socket.send(ident, zmq.SNDMORE) | ||||
# self.reply_socket.send_json(reply_msg) | ||||
MinRK
|
r3546 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | ||
parent=parent, ident=ident,buffers=result_buf, subheader=sub) | ||||
MinRK
|
r3553 | print(Message(reply_msg), file=sys.__stdout__) | ||
MinRK
|
r3546 | # if reply_msg['content']['status'] == u'error': | ||
# self.abort_queues() | ||||
MinRK
|
r3539 | |||
def dispatch_queue(self, stream, msg): | ||||
MinRK
|
r3541 | self.control_stream.flush() | ||
MinRK
|
r3539 | idents,msg = self.session.feed_identities(msg, copy=False) | ||
MinRK
|
r3575 | try: | ||
msg = self.session.unpack_message(msg, content=True, copy=False) | ||||
except: | ||||
logger.error("Invalid Message", exc_info=True) | ||||
return | ||||
MinRK
|
r3539 | |||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
if self.check_aborted(msg_id): | ||||
MinRK
|
r3540 | self.aborted.remove(msg_id) | ||
# is it safe to assume a msg_id will not be resubmitted? | ||||
reply_type = msg['msg_type'].split('_')[0] + '_reply' | ||||
reply_msg = self.session.send(stream, reply_type, | ||||
content={'status' : 'aborted'}, parent=msg, ident=idents) | ||||
return | ||||
MinRK
|
r3569 | handler = self.shell_handlers.get(msg['msg_type'], None) | ||
MinRK
|
r3539 | if handler is None: | ||
MinRK
|
r3553 | print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) | ||
MinRK
|
r3539 | else: | ||
handler(stream, idents, msg) | ||||
def start(self): | ||||
#### stream mode: | ||||
if self.control_stream: | ||||
self.control_stream.on_recv(self.dispatch_control, copy=False) | ||||
MinRK
|
r3540 | self.control_stream.on_err(printer) | ||
MinRK
|
r3569 | |||
MinRK
|
r3578 | def make_dispatcher(stream): | ||
def dispatcher(msg): | ||||
return self.dispatch_queue(stream, msg) | ||||
return dispatcher | ||||
MinRK
|
r3569 | for s in self.shell_streams: | ||
MinRK
|
r3578 | s.on_recv(make_dispatcher(s), copy=False) | ||
MinRK
|
r3569 | s.on_err(printer) | ||
if self.iopub_stream: | ||||
self.iopub_stream.on_err(printer) | ||||
self.iopub_stream.on_send(printer) | ||||
MinRK
|
r3539 | |||
#### while True mode: | ||||
# while True: | ||||
# idle = True | ||||
# try: | ||||
MinRK
|
r3569 | # msg = self.shell_stream.socket.recv_multipart( | ||
MinRK
|
r3539 | # zmq.NOBLOCK, copy=False) | ||
# except zmq.ZMQError, e: | ||||
# if e.errno != zmq.EAGAIN: | ||||
# raise e | ||||
# else: | ||||
# idle=False | ||||
MinRK
|
r3569 | # self.dispatch_queue(self.shell_stream, msg) | ||
MinRK
|
r3539 | # | ||
# if not self.task_stream.empty(): | ||||
# idle=False | ||||
# msg = self.task_stream.recv_multipart() | ||||
# self.dispatch_queue(self.task_stream, msg) | ||||
# if idle: | ||||
# # don't busywait | ||||
# time.sleep(1e-3) | ||||
MinRK
|
r3569 | def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | ||
MinRK
|
r3575 | client_addr=None, loop=None, context=None, key=None): | ||
MinRK
|
r3569 | # create loop, context, and session: | ||
if loop is None: | ||||
loop = ioloop.IOLoop.instance() | ||||
if context is None: | ||||
context = zmq.Context() | ||||
c = context | ||||
MinRK
|
r3575 | session = StreamSession(key=key) | ||
# print (session.key) | ||||
MinRK
|
r3569 | print (control_addr, shell_addrs, iopub_addr, hb_addrs) | ||
# create Control Stream | ||||
control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | ||||
control_stream.setsockopt(zmq.IDENTITY, identity) | ||||
control_stream.connect(control_addr) | ||||
# create Shell Streams (MUX, Task, etc.): | ||||
shell_streams = [] | ||||
for addr in shell_addrs: | ||||
stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | ||||
stream.setsockopt(zmq.IDENTITY, identity) | ||||
stream.connect(addr) | ||||
shell_streams.append(stream) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3569 | # create iopub stream: | ||
iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop) | ||||
iopub_stream.setsockopt(zmq.IDENTITY, identity) | ||||
iopub_stream.connect(iopub_addr) | ||||
# launch heartbeat | ||||
heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) | ||||
heart.start() | ||||
# create (optional) Client | ||||
if client_addr: | ||||
client = Client(client_addr, username=identity) | ||||
else: | ||||
client = None | ||||
kernel = Kernel(session=session, control_stream=control_stream, | ||||
shell_streams=shell_streams, iopub_stream=iopub_stream, | ||||
MinRK
|
r3580 | client=client, loop=loop) | ||
MinRK
|
r3539 | kernel.start() | ||
Fernando Perez
|
r3576 | return loop, c, kernel | ||
MinRK
|
r3539 | |||