streamkernel.py
438 lines
| 15.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
""" | ||||
Kernel adapted from kernel.py to use ZMQ Streams | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Min RK | ||||
* Brian Granger | ||||
* Fernando Perez | ||||
* Evan Patterson | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3660 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3569 | #----------------------------------------------------------------------------- | ||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
# Standard library imports. | ||||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3631 | |||
MinRK
|
r3539 | import sys | ||
import time | ||||
MinRK
|
r3631 | |||
from code import CommandCompiler | ||||
MinRK
|
r3578 | from datetime import datetime | ||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3539 | |||
MinRK
|
r3569 | # System library imports. | ||
MinRK
|
r3539 | import zmq | ||
from zmq.eventloop import ioloop, zmqstream | ||||
MinRK
|
r3569 | # Local imports. | ||
MinRK
|
r4155 | from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes | ||
MinRK
|
r3546 | from IPython.zmq.completer import KernelCompleter | ||
MinRK
|
r3602 | |||
MinRK
|
r3673 | from IPython.parallel.error import wrap_exception | ||
from IPython.parallel.factory import SessionFactory | ||||
MinRK
|
r4161 | from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes | ||
MinRK
|
r3539 | |||
MinRK
|
r3540 | def printer(*args): | ||
MinRK
|
r3602 | pprint(args, stream=sys.__stdout__) | ||
MinRK
|
r3540 | |||
MinRK
|
r3604 | |||
MinRK
|
r3684 | class _Passer(zmqstream.ZMQStream): | ||
"""Empty class that implements `send()` that does nothing. | ||||
MinRK
|
r4006 | Subclass ZMQStream for Session typechecking | ||
MinRK
|
r3684 | |||
""" | ||||
def __init__(self, *args, **kwargs): | ||||
pass | ||||
MinRK
|
r3604 | def send(self, *args, **kwargs): | ||
pass | ||||
send_multipart = send | ||||
MinRK
|
r3569 | #----------------------------------------------------------------------------- | ||
# Main kernel class | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3604 | class Kernel(SessionFactory): | ||
MinRK
|
r3539 | |||
MinRK
|
r3569 | #--------------------------------------------------------------------------- | ||
# Kernel interface | ||||
#--------------------------------------------------------------------------- | ||||
MinRK
|
r3604 | |||
# kwargs: | ||||
MinRK
|
r3988 | exec_lines = List(Unicode, config=True, | ||
MinRK
|
r3985 | help="List of lines to execute") | ||
MinRK
|
r4155 | |||
# identities: | ||||
MinRK
|
r3985 | int_id = Int(-1) | ||
MinRK
|
r4155 | bident = CBytes() | ||
ident = Unicode() | ||||
def _ident_changed(self, name, old, new): | ||||
MinRK
|
r4161 | self.bident = asbytes(new) | ||
MinRK
|
r4155 | |||
MinRK
|
r3985 | user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""") | ||
MinRK
|
r3604 | |||
MinRK
|
r3569 | control_stream = Instance(zmqstream.ZMQStream) | ||
task_stream = Instance(zmqstream.ZMQStream) | ||||
iopub_stream = Instance(zmqstream.ZMQStream) | ||||
MinRK
|
r3673 | client = Instance('IPython.parallel.Client') | ||
MinRK
|
r3604 | |||
# internals | ||||
shell_streams = List() | ||||
compiler = Instance(CommandCompiler, (), {}) | ||||
completer = Instance(KernelCompleter) | ||||
aborted = Set() | ||||
shell_handlers = Dict() | ||||
control_handlers = Dict() | ||||
def _set_prefix(self): | ||||
self.prefix = "engine.%s"%self.int_id | ||||
def _connect_completer(self): | ||||
self.completer = KernelCompleter(self.user_ns) | ||||
MinRK
|
r3569 | |||
def __init__(self, **kwargs): | ||||
super(Kernel, self).__init__(**kwargs) | ||||
MinRK
|
r3604 | self._set_prefix() | ||
self._connect_completer() | ||||
self.on_trait_change(self._set_prefix, 'id') | ||||
self.on_trait_change(self._connect_completer, 'user_ns') | ||||
MinRK
|
r3539 | |||
# Build dict of handlers for message types | ||||
MinRK
|
r3569 | for msg_type in ['execute_request', 'complete_request', 'apply_request', | ||
'clear_request']: | ||||
self.shell_handlers[msg_type] = getattr(self, msg_type) | ||||
MinRK
|
r3539 | |||
MinRK
|
r3569 | for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): | ||
MinRK
|
r3539 | self.control_handlers[msg_type] = getattr(self, msg_type) | ||
MinRK
|
r3604 | |||
self._initial_exec_lines() | ||||
MinRK
|
r3583 | |||
def _wrap_exception(self, method=None): | ||||
MinRK
|
r3641 | e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method) | ||
MinRK
|
r3583 | content=wrap_exception(e_info) | ||
return content | ||||
MinRK
|
r3604 | def _initial_exec_lines(self): | ||
s = _Passer() | ||||
content = dict(silent=True, user_variable=[],user_expressions=[]) | ||||
for line in self.exec_lines: | ||||
MinRK
|
r3610 | self.log.debug("executing initialization: %s"%line) | ||
MinRK
|
r3604 | content.update({'code':line}) | ||
msg = self.session.msg('execute_request', content) | ||||
self.execute_request(s, [], msg) | ||||
MinRK
|
r3539 | #-------------------- control handlers ----------------------------- | ||
MinRK
|
r3540 | def abort_queues(self): | ||
MinRK
|
r3569 | for stream in self.shell_streams: | ||
MinRK
|
r3540 | if stream: | ||
self.abort_queue(stream) | ||||
MinRK
|
r3539 | |||
def abort_queue(self, stream): | ||||
while True: | ||||
MinRK
|
r4011 | idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) | ||
if msg is None: | ||||
return | ||||
MinRK
|
r3539 | |||
MinRK
|
r3610 | self.log.info("Aborting:") | ||
self.log.info(str(msg)) | ||||
MinRK
|
r3539 | msg_type = msg['msg_type'] | ||
reply_type = msg_type.split('_')[0] + '_reply' | ||||
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | ||||
# self.reply_socket.send(ident,zmq.SNDMORE) | ||||
# self.reply_socket.send_json(reply_msg) | ||||
reply_msg = self.session.send(stream, reply_type, | ||||
MinRK
|
r4137 | content={'status' : 'aborted'}, parent=msg, ident=idents) | ||
MinRK
|
r3610 | self.log.debug(str(reply_msg)) | ||
MinRK
|
r3539 | # We need to wait a bit for requests to come in. This can probably | ||
# be set shorter for true asynchronous clients. | ||||
time.sleep(0.05) | ||||
def abort_request(self, stream, ident, parent): | ||||
MinRK
|
r3540 | """abort a specifig msg by id""" | ||
MinRK
|
r3539 | msg_ids = parent['content'].get('msg_ids', None) | ||
MinRK
|
r3540 | if isinstance(msg_ids, basestring): | ||
msg_ids = [msg_ids] | ||||
MinRK
|
r3539 | if not msg_ids: | ||
MinRK
|
r3540 | self.abort_queues() | ||
MinRK
|
r3539 | for mid in msg_ids: | ||
MinRK
|
r3540 | self.aborted.add(str(mid)) | ||
MinRK
|
r3539 | |||
content = dict(status='ok') | ||||
MinRK
|
r3550 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | ||
MinRK
|
r3664 | parent=parent, ident=ident) | ||
MinRK
|
r3610 | self.log.debug(str(reply_msg)) | ||
MinRK
|
r3539 | |||
MinRK
|
r3569 | def shutdown_request(self, stream, ident, parent): | ||
MinRK
|
r3560 | """kill ourself. This should really be handled in an external process""" | ||
MinRK
|
r3580 | try: | ||
self.abort_queues() | ||||
except: | ||||
MinRK
|
r3583 | content = self._wrap_exception('shutdown') | ||
MinRK
|
r3580 | else: | ||
content = dict(parent['content']) | ||||
content['status'] = 'ok' | ||||
MinRK
|
r3575 | msg = self.session.send(stream, 'shutdown_reply', | ||
content=content, parent=parent, ident=ident) | ||||
MinRK
|
r3664 | self.log.debug(str(msg)) | ||
MinRK
|
r3580 | dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop) | ||
dc.start() | ||||
MinRK
|
r3560 | |||
MinRK
|
r3539 | def dispatch_control(self, msg): | ||
idents,msg = self.session.feed_identities(msg, copy=False) | ||||
MinRK
|
r3575 | try: | ||
msg = self.session.unpack_message(msg, content=True, copy=False) | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("Invalid Message", exc_info=True) | ||
MinRK
|
r3575 | return | ||
MinRK
|
r4155 | else: | ||
self.log.debug("Control received, %s", msg) | ||||
MinRK
|
r3539 | |||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
handler = self.control_handlers.get(msg['msg_type'], None) | ||||
if handler is None: | ||||
MinRK
|
r3610 | self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type']) | ||
MinRK
|
r3539 | else: | ||
MinRK
|
r3540 | handler(self.control_stream, idents, msg) | ||
MinRK
|
r3539 | |||
#-------------------- queue helpers ------------------------------ | ||||
def check_dependencies(self, dependencies): | ||||
if not dependencies: | ||||
return True | ||||
if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): | ||||
anyorall = dependencies[0] | ||||
dependencies = dependencies[1] | ||||
else: | ||||
anyorall = 'all' | ||||
results = self.client.get_results(dependencies,status_only=True) | ||||
if results['status'] != 'ok': | ||||
return False | ||||
if anyorall == 'any': | ||||
if not results['completed']: | ||||
return False | ||||
else: | ||||
if results['pending']: | ||||
return False | ||||
return True | ||||
MinRK
|
r3540 | def check_aborted(self, msg_id): | ||
return msg_id in self.aborted | ||||
MinRK
|
r3539 | #-------------------- queue handlers ----------------------------- | ||
MinRK
|
r3569 | def clear_request(self, stream, idents, parent): | ||
"""Clear our namespace.""" | ||||
self.user_ns = {} | ||||
msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, | ||||
content = dict(status='ok')) | ||||
MinRK
|
r3604 | self._initial_exec_lines() | ||
MinRK
|
r3569 | |||
MinRK
|
r3539 | def execute_request(self, stream, ident, parent): | ||
MinRK
|
r3610 | self.log.debug('execute request %s'%parent) | ||
MinRK
|
r3539 | try: | ||
code = parent[u'content'][u'code'] | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("Got bad msg: %s"%parent, exc_info=True) | ||
MinRK
|
r3539 | return | ||
MinRK
|
r3602 | self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, | ||
MinRK
|
r4161 | ident=asbytes('%s.pyin'%self.prefix)) | ||
MinRK
|
r4008 | started = datetime.now() | ||
MinRK
|
r3539 | try: | ||
comp_code = self.compiler(code, '<zmq-kernel>') | ||||
# allow for not overriding displayhook | ||||
if hasattr(sys.displayhook, 'set_parent'): | ||||
sys.displayhook.set_parent(parent) | ||||
MinRK
|
r3602 | sys.stdout.set_parent(parent) | ||
sys.stderr.set_parent(parent) | ||||
MinRK
|
r3539 | exec comp_code in self.user_ns, self.user_ns | ||
except: | ||||
MinRK
|
r3583 | exc_content = self._wrap_exception('execute') | ||
MinRK
|
r3539 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | ||
MinRK
|
r3602 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | ||
MinRK
|
r4161 | ident=asbytes('%s.pyerr'%self.prefix)) | ||
MinRK
|
r3539 | reply_content = exc_content | ||
else: | ||||
reply_content = {'status' : 'ok'} | ||||
MinRK
|
r3607 | |||
MinRK
|
r3578 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, | ||
ident=ident, subheader = dict(started=started)) | ||||
MinRK
|
r3610 | self.log.debug(str(reply_msg)) | ||
MinRK
|
r3539 | if reply_msg['content']['status'] == u'error': | ||
MinRK
|
r3540 | self.abort_queues() | ||
MinRK
|
r3539 | |||
def complete_request(self, stream, ident, parent): | ||||
matches = {'matches' : self.complete(parent), | ||||
'status' : 'ok'} | ||||
completion_msg = self.session.send(stream, 'complete_reply', | ||||
matches, parent, ident) | ||||
# print >> sys.__stdout__, completion_msg | ||||
def complete(self, msg): | ||||
return self.completer.complete(msg.content.line, msg.content.text) | ||||
def apply_request(self, stream, ident, parent): | ||||
MinRK
|
r3627 | # flush previous reply, so this request won't block it | ||
stream.flush(zmq.POLLOUT) | ||||
MinRK
|
r3539 | try: | ||
content = parent[u'content'] | ||||
bufs = parent[u'buffers'] | ||||
msg_id = parent['header']['msg_id'] | ||||
MinRK
|
r3664 | # bound = parent['header'].get('bound', False) | ||
MinRK
|
r3539 | except: | ||
MinRK
|
r3610 | self.log.error("Got bad msg: %s"%parent, exc_info=True) | ||
MinRK
|
r3539 | return | ||
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | ||||
MinRK
|
r3569 | # self.iopub_stream.send(pyin_msg) | ||
# self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) | ||||
MinRK
|
r3604 | sub = {'dependencies_met' : True, 'engine' : self.ident, | ||
MinRK
|
r4008 | 'started': datetime.now()} | ||
MinRK
|
r3539 | try: | ||
# allow for not overriding displayhook | ||||
if hasattr(sys.displayhook, 'set_parent'): | ||||
sys.displayhook.set_parent(parent) | ||||
MinRK
|
r3602 | sys.stdout.set_parent(parent) | ||
sys.stderr.set_parent(parent) | ||||
MinRK
|
r3539 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | ||
MinRK
|
r3655 | working = self.user_ns | ||
# suffix = | ||||
prefix = "_"+str(msg_id).replace("-","")+"_" | ||||
MinRK
|
r3664 | |||
MinRK
|
r3539 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | ||
MinRK
|
r3664 | # if bound: | ||
# bound_ns = Namespace(working) | ||||
# args = [bound_ns]+list(args) | ||||
MinRK
|
r3607 | fname = getattr(f, '__name__', 'f') | ||
MinRK
|
r3588 | |||
MinRK
|
r3655 | fname = prefix+"f" | ||
argname = prefix+"args" | ||||
kwargname = prefix+"kwargs" | ||||
resultname = prefix+"result" | ||||
MinRK
|
r3539 | |||
MinRK
|
r3655 | ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } | ||
MinRK
|
r3539 | # print ns | ||
working.update(ns) | ||||
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | ||||
MinRK
|
r3655 | try: | ||
exec code in working,working | ||||
result = working.get(resultname) | ||||
finally: | ||||
MinRK
|
r3539 | for key in ns.iterkeys(): | ||
MinRK
|
r3655 | working.pop(key) | ||
MinRK
|
r3664 | # if bound: | ||
# working.update(bound_ns) | ||||
MinRK
|
r3539 | |||
packed_result,buf = serialize_object(result) | ||||
result_buf = [packed_result]+buf | ||||
except: | ||||
MinRK
|
r3583 | exc_content = self._wrap_exception('apply') | ||
MinRK
|
r3539 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | ||
MinRK
|
r3602 | self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, | ||
MinRK
|
r4161 | ident=asbytes('%s.pyerr'%self.prefix)) | ||
MinRK
|
r3539 | reply_content = exc_content | ||
result_buf = [] | ||||
MinRK
|
r3546 | |||
MinRK
|
r3607 | if exc_content['ename'] == 'UnmetDependency': | ||
MinRK
|
r3560 | sub['dependencies_met'] = False | ||
MinRK
|
r3539 | else: | ||
reply_content = {'status' : 'ok'} | ||||
MinRK
|
r3607 | |||
# put 'ok'/'error' status in header, for scheduler introspection: | ||||
sub['status'] = reply_content['status'] | ||||
MinRK
|
r3546 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | ||
parent=parent, ident=ident,buffers=result_buf, subheader=sub) | ||||
MinRK
|
r3664 | |||
# flush i/o | ||||
# should this be before reply_msg is sent, like in the single-kernel code, | ||||
# or should nothing get in the way of real results? | ||||
sys.stdout.flush() | ||||
sys.stderr.flush() | ||||
MinRK
|
r3539 | |||
def dispatch_queue(self, stream, msg): | ||||
MinRK
|
r3541 | self.control_stream.flush() | ||
MinRK
|
r3539 | idents,msg = self.session.feed_identities(msg, copy=False) | ||
MinRK
|
r3575 | try: | ||
msg = self.session.unpack_message(msg, content=True, copy=False) | ||||
except: | ||||
MinRK
|
r3610 | self.log.error("Invalid Message", exc_info=True) | ||
MinRK
|
r3575 | return | ||
MinRK
|
r4155 | else: | ||
self.log.debug("Message received, %s", msg) | ||||
MinRK
|
r3575 | |||
MinRK
|
r3539 | |||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
if self.check_aborted(msg_id): | ||||
MinRK
|
r3540 | self.aborted.remove(msg_id) | ||
# is it safe to assume a msg_id will not be resubmitted? | ||||
reply_type = msg['msg_type'].split('_')[0] + '_reply' | ||||
MinRK
|
r3687 | status = {'status' : 'aborted'} | ||
reply_msg = self.session.send(stream, reply_type, subheader=status, | ||||
content=status, parent=msg, ident=idents) | ||||
MinRK
|
r3540 | return | ||
MinRK
|
r3569 | handler = self.shell_handlers.get(msg['msg_type'], None) | ||
MinRK
|
r3539 | if handler is None: | ||
MinRK
|
r3610 | self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type']) | ||
MinRK
|
r3539 | else: | ||
handler(stream, idents, msg) | ||||
def start(self): | ||||
#### stream mode: | ||||
if self.control_stream: | ||||
self.control_stream.on_recv(self.dispatch_control, copy=False) | ||||
MinRK
|
r3540 | self.control_stream.on_err(printer) | ||
MinRK
|
r3569 | |||
MinRK
|
r3578 | def make_dispatcher(stream): | ||
def dispatcher(msg): | ||||
return self.dispatch_queue(stream, msg) | ||||
return dispatcher | ||||
MinRK
|
r3569 | for s in self.shell_streams: | ||
MinRK
|
r3578 | s.on_recv(make_dispatcher(s), copy=False) | ||
MinRK
|
r3607 | s.on_err(printer) | ||
MinRK
|
r3569 | |||
if self.iopub_stream: | ||||
self.iopub_stream.on_err(printer) | ||||
MinRK
|
r3539 | |||
#### while True mode: | ||||
# while True: | ||||
# idle = True | ||||
# try: | ||||
MinRK
|
r3569 | # msg = self.shell_stream.socket.recv_multipart( | ||
MinRK
|
r3539 | # zmq.NOBLOCK, copy=False) | ||
# except zmq.ZMQError, e: | ||||
# if e.errno != zmq.EAGAIN: | ||||
# raise e | ||||
# else: | ||||
# idle=False | ||||
MinRK
|
r3569 | # self.dispatch_queue(self.shell_stream, msg) | ||
MinRK
|
r3539 | # | ||
# if not self.task_stream.empty(): | ||||
# idle=False | ||||
# msg = self.task_stream.recv_multipart() | ||||
# self.dispatch_queue(self.task_stream, msg) | ||||
# if idle: | ||||
# # don't busywait | ||||
# time.sleep(1e-3) | ||||