streamkernel.py
484 lines
| 17.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
""" | ||||
Kernel adapted from kernel.py to use ZMQ Streams | ||||
""" | ||||
MinRK
|
r3569 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
# Standard library imports. | ||||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3631 | |||
MinRK
|
r3539 | import sys | ||
import time | ||||
MinRK
|
r3631 | |||
from code import CommandCompiler | ||||
MinRK
|
r3578 | from datetime import datetime | ||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3631 | from signal import SIGTERM, SIGKILL | ||
MinRK
|
r3539 | |||
MinRK
|
r3569 | # System library imports. | ||
MinRK
|
r3539 | import zmq | ||
from zmq.eventloop import ioloop, zmqstream | ||||
MinRK
|
r3569 | # Local imports. | ||
MinRK
|
r3583 | from IPython.core import ultratb | ||
MinRK
|
r3644 | from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Str | ||
MinRK
|
r3546 | from IPython.zmq.completer import KernelCompleter | ||
MinRK
|
r3602 | from IPython.zmq.iostream import OutStream | ||
from IPython.zmq.displayhook import DisplayHook | ||||
MinRK
|
r3642 | from . import heartmonitor | ||
from .client import Client | ||||
MinRK
|
r3644 | from .error import wrap_exception | ||
MinRK
|
r3642 | from .factory import SessionFactory | ||
MinRK
|
r3644 | from .streamsession import StreamSession | ||
from .util import serialize_object, unpack_apply_message, ISO8601 | ||||
MinRK
|
r3539 | |||
MinRK
|
r3540 | def printer(*args): | ||
MinRK
|
r3602 | pprint(args, stream=sys.__stdout__) | ||
MinRK
|
r3540 | |||
MinRK
|
r3604 | |||
class _Passer: | ||||
"""Empty class that implements `send()` that does nothing.""" | ||||
def send(self, *args, **kwargs): | ||||
pass | ||||
send_multipart = send | ||||
MinRK
|
r3569 | #----------------------------------------------------------------------------- | ||
# Main kernel class | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3604 | class Kernel(SessionFactory): | ||
MinRK
|
r3539 | |||
MinRK
|
r3569 | #--------------------------------------------------------------------------- | ||
# Kernel interface | ||||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r3604 | |||
# kwargs: | ||||
int_id = Int(-1, config=True) | ||||
user_ns = Dict(config=True) | ||||
exec_lines = List(config=True) | ||||
MinRK
|
r3569 | control_stream = Instance(zmqstream.ZMQStream) | ||
task_stream = Instance(zmqstream.ZMQStream) | ||||
iopub_stream = Instance(zmqstream.ZMQStream) | ||||
MinRK
|
r3604 | client = Instance('IPython.zmq.parallel.client.Client') | ||
# internals | ||||
shell_streams = List() | ||||
compiler = Instance(CommandCompiler, (), {}) | ||||
completer = Instance(KernelCompleter) | ||||
aborted = Set() | ||||
shell_handlers = Dict() | ||||
control_handlers = Dict() | ||||
def _set_prefix(self): | ||||
self.prefix = "engine.%s"%self.int_id | ||||
def _connect_completer(self): | ||||
self.completer = KernelCompleter(self.user_ns) | ||||
MinRK
|
r3569 | |||
def __init__(self, **kwargs): | ||||
super(Kernel, self).__init__(**kwargs) | ||||
MinRK
|
r3604 | self._set_prefix() | ||
self._connect_completer() | ||||
self.on_trait_change(self._set_prefix, 'id') | ||||
self.on_trait_change(self._connect_completer, 'user_ns') | ||||
MinRK
|
r3539 | |||
# Build dict of handlers for message types | ||||
MinRK
|
r3569 | for msg_type in ['execute_request', 'complete_request', 'apply_request', | ||
'clear_request']: | ||||
self.shell_handlers[msg_type] = getattr(self, msg_type) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3569 | for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): | ||
MinRK
|
r3539 | self.control_handlers[msg_type] = getattr(self, msg_type) | ||
MinRK
|
r3604 | |||
self._initial_exec_lines() | ||||
MinRK
|
r3583 | |||
def _wrap_exception(self, method=None): | ||||
MinRK
|
r3641 | e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) | ||
MinRK
|
r3583 | content=wrap_exception(e_info) | ||
return content | ||||
MinRK
|
r3604 | def _initial_exec_lines(self): | ||
s = _Passer() | ||||
content = dict(silent=True, user_variable=[],user_expressions=[]) | ||||
for line in self.exec_lines: | ||||
MinRK
|
r3610 | self.log.debug("executing initialization: %s"%line) | ||
MinRK
|
r3604 | content.update({'code':line}) | ||
msg = self.session.msg('execute_request', content) | ||||
self.execute_request(s, [], msg) | ||||
MinRK
|
r3539 | #-------------------- control handlers ----------------------------- | ||
MinRK
|
r3540 | def abort_queues(self): | ||
MinRK
|
r3569 | for stream in self.shell_streams: | ||
MinRK
|
r3540 | if stream: | ||
self.abort_queue(stream) | ||||
MinRK
|
r3539 | |||
def abort_queue(self, stream): | ||||
while True: | ||||
try: | ||||
msg = self.session.recv(stream, zmq.NOBLOCK,content=True) | ||||
MinRK
|
r3556 | except zmq.ZMQError as e: | ||
MinRK
|
r3539 | if e.errno == zmq.EAGAIN: | ||
break | ||||
else: | ||||
return | ||||
else: | ||||
if msg is None: | ||||
return | ||||
else: | ||||
idents,msg = msg | ||||
# assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." | ||||
# msg = self.reply_socket.recv_json() | ||||
MinRK
|
r3610 | self.log.info("Aborting:") | ||
self.log.info(str(msg)) | ||||
MinRK
|
r3539 | msg_type = msg['msg_type'] | ||
reply_type = msg_type.split('_')[0] + '_reply' | ||||
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | ||||
# self.reply_socket.send(ident,zmq.SNDMORE) | ||||
# self.reply_socket.send_json(reply_msg) | ||||
reply_msg = self.session.send(stream, reply_type, | ||||
MinRK
|
r3550 | content={'status' : 'aborted'}, parent=msg, ident=idents)[0] | ||
MinRK
|
r3610 | self.log.debug(str(reply_msg)) | ||
MinRK
|
r3539 | # We need to wait a bit for requests to come in. This can probably | ||
# be set shorter for true asynchronous clients. | ||||
time.sleep(0.05) | ||||
def abort_request(self, stream, ident, parent): | ||||
MinRK
|
r3540 | """abort a specifig msg by id""" | ||
MinRK
|
r3539 | msg_ids = parent['content'].get('msg_ids', None) | ||
MinRK
|
r3540 | if isinstance(msg_ids, basestring): | ||
msg_ids = [msg_ids] | ||||
MinRK
|
r3539 | if not msg_ids: | ||
MinRK
|
r3540 | self.abort_queues() | ||
MinRK
|
r3539 | for mid in msg_ids: | ||
MinRK
|
r3540 | self.aborted.add(str(mid)) | ||
MinRK
|
r3539 | |||
content = dict(status='ok') | ||||
MinRK
|
r3550 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | ||
parent=parent, ident=ident)[0] | ||||
MinRK
|
r3610 | self.log.debug(str(reply_msg)) | ||
MinRK
|
r3539 | |||
MinRK
|
r3569 | def shutdown_request(self, stream, ident, parent): | ||
MinRK
|
r3560 | """kill ourself. This should really be handled in an external process""" | ||
MinRK
|
r3580 | try: | ||
self.abort_queues() | ||||
except: | ||||
MinRK
|
r3583 | content = self._wrap_exception('shutdown') | ||
MinRK
|
r3580 | else: | ||
content = dict(parent['content']) | ||||
content['status'] = 'ok' | ||||
MinRK
|
r3575 | msg = self.session.send(stream, 'shutdown_reply', | ||
content=content, parent=parent, ident=ident) | ||||
# msg = self.session.send(self.pub_socket, 'shutdown_reply', | ||||
# content, parent, ident) | ||||
MinRK
|
r3569 | # print >> sys.__stdout__, msg | ||
MinRK
|
r3580 | # time.sleep(0.2) | ||
dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3560 | |||
MinRK
|
r3539 | def dispatch_control(self, msg): | ||
idents,msg = self.session.feed_identities(msg, copy=False) | ||||
MinRK
|
r3575 | try: | ||
msg = self.session.unpack_message(msg, content=True, copy=False) | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("Invalid Message", exc_info=True) | ||
MinRK
|
r3575 | return | ||
MinRK
|
r3539 | |||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
handler = self.control_handlers.get(msg['msg_type'], None) | ||||
if handler is None: | ||||
MinRK
|
r3610 | self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) | ||
MinRK
|
r3539 | else: | ||
MinRK
|
r3540 | handler(self.control_stream, idents, msg) | ||
MinRK
|
r3539 | |||
#-------------------- queue helpers ------------------------------ | ||||
def check_dependencies(self, dependencies): | ||||
if not dependencies: | ||||
return True | ||||
if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): | ||||
anyorall = dependencies[0] | ||||
dependencies = dependencies[1] | ||||
else: | ||||
anyorall = 'all' | ||||
results = self.client.get_results(dependencies,status_only=True) | ||||
if results['status'] != 'ok': | ||||
return False | ||||
if anyorall == 'any': | ||||
if not results['completed']: | ||||
return False | ||||
else: | ||||
if results['pending']: | ||||
return False | ||||
return True | ||||
MinRK
|
r3540 | def check_aborted(self, msg_id): | ||
return msg_id in self.aborted | ||||
MinRK
|
r3539 | #-------------------- queue handlers ----------------------------- | ||
MinRK
|
r3569 | def clear_request(self, stream, idents, parent): | ||
"""Clear our namespace.""" | ||||
self.user_ns = {} | ||||
msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | ||||
content = dict(status='ok')) | ||||
MinRK
|
r3604 | self._initial_exec_lines() | ||
MinRK
|
r3569 | |||
MinRK
|
r3539 | def execute_request(self, stream, ident, parent): | ||
MinRK
|
r3610 | self.log.debug('execute request %s'%parent) | ||
MinRK
|
r3539 | try: | ||
code = parent[u'content'][u'code'] | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("Got bad msg: %s"%parent, exc_info=True) | ||
MinRK
|
r3539 | return | ||
MinRK
|
r3602 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, | ||
MinRK
|
r3603 | ident='%s.pyin'%self.prefix) | ||
MinRK
|
r3578 | started = datetime.now().strftime(ISO8601) | ||
MinRK
|
r3539 | try: | ||
comp_code = self.compiler(code, '<zmq-kernel>') | ||||
# allow for not overriding displayhook | ||||
if hasattr(sys.displayhook, 'set_parent'): | ||||
sys.displayhook.set_parent(parent) | ||||
MinRK
|
r3602 | sys.stdout.set_parent(parent) | ||
sys.stderr.set_parent(parent) | ||||
MinRK
|
r3539 | exec comp_code in self.user_ns, self.user_ns | ||
except: | ||||
MinRK
|
r3583 | exc_content = self._wrap_exception('execute') | ||
MinRK
|
r3539 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | ||
MinRK
|
r3602 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | ||
MinRK
|
r3603 | ident='%s.pyerr'%self.prefix) | ||
MinRK
|
r3539 | reply_content = exc_content | ||
else: | ||||
reply_content = {'status' : 'ok'} | ||||
MinRK
|
r3607 | |||
MinRK
|
r3578 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, | ||
ident=ident, subheader = dict(started=started)) | ||||
MinRK
|
r3610 | self.log.debug(str(reply_msg)) | ||
MinRK
|
r3539 | if reply_msg['content']['status'] == u'error': | ||
MinRK
|
r3540 | self.abort_queues() | ||
MinRK
|
r3539 | |||
def complete_request(self, stream, ident, parent): | ||||
matches = {'matches' : self.complete(parent), | ||||
'status' : 'ok'} | ||||
completion_msg = self.session.send(stream, 'complete_reply', | ||||
matches, parent, ident) | ||||
# print >> sys.__stdout__, completion_msg | ||||
def complete(self, msg): | ||||
return self.completer.complete(msg.content.line, msg.content.text) | ||||
def apply_request(self, stream, ident, parent): | ||||
MinRK
|
r3627 | # flush previous reply, so this request won't block it | ||
stream.flush(zmq.POLLOUT) | ||||
MinRK
|
r3539 | try: | ||
content = parent[u'content'] | ||||
bufs = parent[u'buffers'] | ||||
msg_id = parent['header']['msg_id'] | ||||
bound = content.get('bound', False) | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("Got bad msg: %s"%parent, exc_info=True) | ||
MinRK
|
r3539 | return | ||
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | ||||
MinRK
|
r3569 | # self.iopub_stream.send(pyin_msg) | ||
# self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | ||||
MinRK
|
r3604 | sub = {'dependencies_met' : True, 'engine' : self.ident, | ||
MinRK
|
r3578 | 'started': datetime.now().strftime(ISO8601)} | ||
MinRK
|
r3539 | try: | ||
# allow for not overriding displayhook | ||||
if hasattr(sys.displayhook, 'set_parent'): | ||||
sys.displayhook.set_parent(parent) | ||||
MinRK
|
r3602 | sys.stdout.set_parent(parent) | ||
sys.stderr.set_parent(parent) | ||||
MinRK
|
r3539 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | ||
if bound: | ||||
working = self.user_ns | ||||
suffix = str(msg_id).replace("-","") | ||||
prefix = "_" | ||||
else: | ||||
working = dict() | ||||
MinRK
|
r3540 | suffix = prefix = "_" # prevent keyword collisions with lambda | ||
MinRK
|
r3539 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | ||
# if f.fun | ||||
MinRK
|
r3607 | fname = getattr(f, '__name__', 'f') | ||
MinRK
|
r3588 | |||
fname = prefix+fname.strip('<>')+suffix | ||||
MinRK
|
r3539 | argname = prefix+"args"+suffix | ||
kwargname = prefix+"kwargs"+suffix | ||||
resultname = prefix+"result"+suffix | ||||
ns = { fname : f, argname : args, kwargname : kwargs } | ||||
# print ns | ||||
working.update(ns) | ||||
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | ||||
exec code in working, working | ||||
result = working.get(resultname) | ||||
# clear the namespace | ||||
if bound: | ||||
for key in ns.iterkeys(): | ||||
self.user_ns.pop(key) | ||||
else: | ||||
del working | ||||
packed_result,buf = serialize_object(result) | ||||
result_buf = [packed_result]+buf | ||||
except: | ||||
MinRK
|
r3583 | exc_content = self._wrap_exception('apply') | ||
MinRK
|
r3539 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | ||
MinRK
|
r3602 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | ||
MinRK
|
r3603 | ident='%s.pyerr'%self.prefix) | ||
MinRK
|
r3539 | reply_content = exc_content | ||
result_buf = [] | ||||
MinRK
|
r3546 | |||
MinRK
|
r3607 | if exc_content['ename'] == 'UnmetDependency': | ||
MinRK
|
r3560 | sub['dependencies_met'] = False | ||
MinRK
|
r3539 | else: | ||
reply_content = {'status' : 'ok'} | ||||
MinRK
|
r3607 | |||
# put 'ok'/'error' status in header, for scheduler introspection: | ||||
sub['status'] = reply_content['status'] | ||||
MinRK
|
r3546 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | ||
parent=parent, ident=ident,buffers=result_buf, subheader=sub) | ||||
MinRK
|
r3627 | |||
MinRK
|
r3546 | # if reply_msg['content']['status'] == u'error': | ||
# self.abort_queues() | ||||
MinRK
|
r3539 | |||
def dispatch_queue(self, stream, msg): | ||||
MinRK
|
r3541 | self.control_stream.flush() | ||
MinRK
|
r3539 | idents,msg = self.session.feed_identities(msg, copy=False) | ||
MinRK
|
r3575 | try: | ||
msg = self.session.unpack_message(msg, content=True, copy=False) | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("Invalid Message", exc_info=True) | ||
MinRK
|
r3575 | return | ||
MinRK
|
r3539 | |||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
if self.check_aborted(msg_id): | ||||
MinRK
|
r3540 | self.aborted.remove(msg_id) | ||
# is it safe to assume a msg_id will not be resubmitted? | ||||
reply_type = msg['msg_type'].split('_')[0] + '_reply' | ||||
reply_msg = self.session.send(stream, reply_type, | ||||
content={'status' : 'aborted'}, parent=msg, ident=idents) | ||||
return | ||||
MinRK
|
r3569 | handler = self.shell_handlers.get(msg['msg_type'], None) | ||
MinRK
|
r3539 | if handler is None: | ||
MinRK
|
r3610 | self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) | ||
MinRK
|
r3539 | else: | ||
handler(stream, idents, msg) | ||||
def start(self): | ||||
#### stream mode: | ||||
if self.control_stream: | ||||
self.control_stream.on_recv(self.dispatch_control, copy=False) | ||||
MinRK
|
r3540 | self.control_stream.on_err(printer) | ||
MinRK
|
r3569 | |||
MinRK
|
r3578 | def make_dispatcher(stream): | ||
def dispatcher(msg): | ||||
return self.dispatch_queue(stream, msg) | ||||
return dispatcher | ||||
MinRK
|
r3569 | for s in self.shell_streams: | ||
MinRK
|
r3578 | s.on_recv(make_dispatcher(s), copy=False) | ||
MinRK
|
r3607 | s.on_err(printer) | ||
MinRK
|
r3569 | |||
if self.iopub_stream: | ||||
self.iopub_stream.on_err(printer) | ||||
MinRK
|
r3539 | |||
#### while True mode: | ||||
# while True: | ||||
# idle = True | ||||
# try: | ||||
MinRK
|
r3569 | # msg = self.shell_stream.socket.recv_multipart( | ||
MinRK
|
r3539 | # zmq.NOBLOCK, copy=False) | ||
# except zmq.ZMQError, e: | ||||
# if e.errno != zmq.EAGAIN: | ||||
# raise e | ||||
# else: | ||||
# idle=False | ||||
MinRK
|
r3569 | # self.dispatch_queue(self.shell_stream, msg) | ||
MinRK
|
r3539 | # | ||
# if not self.task_stream.empty(): | ||||
# idle=False | ||||
# msg = self.task_stream.recv_multipart() | ||||
# self.dispatch_queue(self.task_stream, msg) | ||||
# if idle: | ||||
# # don't busywait | ||||
# time.sleep(1e-3) | ||||
MinRK
|
r3603 | def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs, | ||
MinRK
|
r3602 | client_addr=None, loop=None, context=None, key=None, | ||
out_stream_factory=OutStream, display_hook_factory=DisplayHook): | ||||
MinRK
|
r3604 | """NO LONGER IN USE""" | ||
MinRK
|
r3569 | # create loop, context, and session: | ||
if loop is None: | ||||
loop = ioloop.IOLoop.instance() | ||||
if context is None: | ||||
context = zmq.Context() | ||||
c = context | ||||
MinRK
|
r3575 | session = StreamSession(key=key) | ||
# print (session.key) | ||||
MinRK
|
r3603 | # print (control_addr, shell_addrs, iopub_addr, hb_addrs) | ||
MinRK
|
r3569 | |||
# create Control Stream | ||||
control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | ||||
control_stream.setsockopt(zmq.IDENTITY, identity) | ||||
control_stream.connect(control_addr) | ||||
# create Shell Streams (MUX, Task, etc.): | ||||
shell_streams = [] | ||||
for addr in shell_addrs: | ||||
stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) | ||||
stream.setsockopt(zmq.IDENTITY, identity) | ||||
stream.connect(addr) | ||||
shell_streams.append(stream) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3569 | # create iopub stream: | ||
iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop) | ||||
iopub_stream.setsockopt(zmq.IDENTITY, identity) | ||||
iopub_stream.connect(iopub_addr) | ||||
MinRK
|
r3602 | # Redirect input streams and set a display hook. | ||
if out_stream_factory: | ||||
sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') | ||||
MinRK
|
r3603 | sys.stdout.topic = 'engine.%i.stdout'%int_id | ||
MinRK
|
r3602 | sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') | ||
MinRK
|
r3603 | sys.stderr.topic = 'engine.%i.stderr'%int_id | ||
MinRK
|
r3602 | if display_hook_factory: | ||
sys.displayhook = display_hook_factory(session, iopub_stream) | ||||
MinRK
|
r3603 | sys.displayhook.topic = 'engine.%i.pyout'%int_id | ||
MinRK
|
r3602 | |||
MinRK
|
r3569 | # launch heartbeat | ||
heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) | ||||
heart.start() | ||||
# create (optional) Client | ||||
if client_addr: | ||||
client = Client(client_addr, username=identity) | ||||
else: | ||||
client = None | ||||
MinRK
|
r3604 | kernel = Kernel(id=int_id, session=session, control_stream=control_stream, | ||
MinRK
|
r3569 | shell_streams=shell_streams, iopub_stream=iopub_stream, | ||
MinRK
|
r3580 | client=client, loop=loop) | ||
MinRK
|
r3539 | kernel.start() | ||
Fernando Perez
|
r3576 | return loop, c, kernel | ||
MinRK
|
r3539 | |||