##// END OF EJS Templates
Merge branch 'get-range-session-max-index'
Merge branch 'get-range-session-max-index'

File last commit:

r4578:4ee2feb7 merge
r4860:039e00aa merge
Show More
streamkernel.py
439 lines | 15.9 KiB | text/x-python | PythonLexer
"""
Kernel adapted from kernel.py to use ZMQ Streams
Authors:
* Min RK
* Brian Granger
* Fernando Perez
* Evan Patterson
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports.
from __future__ import print_function
import sys
import time
from code import CommandCompiler
from datetime import datetime
from pprint import pprint
# System library imports.
import zmq
from zmq.eventloop import ioloop, zmqstream
# Local imports.
from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode, CBytes
from IPython.zmq.completer import KernelCompleter
from IPython.parallel.error import wrap_exception
from IPython.parallel.factory import SessionFactory
from IPython.parallel.util import serialize_object, unpack_apply_message, asbytes
def printer(*args):
pprint(args, stream=sys.__stdout__)
class _Passer(zmqstream.ZMQStream):
"""Empty class that implements `send()` that does nothing.
Subclass ZMQStream for Session typechecking
"""
def __init__(self, *args, **kwargs):
pass
def send(self, *args, **kwargs):
pass
send_multipart = send
#-----------------------------------------------------------------------------
# Main kernel class
#-----------------------------------------------------------------------------
class Kernel(SessionFactory):
#---------------------------------------------------------------------------
# Kernel interface
#---------------------------------------------------------------------------
# kwargs:
exec_lines = List(Unicode, config=True,
help="List of lines to execute")
# identities:
int_id = Int(-1)
bident = CBytes()
ident = Unicode()
def _ident_changed(self, name, old, new):
self.bident = asbytes(new)
user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
control_stream = Instance(zmqstream.ZMQStream)
task_stream = Instance(zmqstream.ZMQStream)
iopub_stream = Instance(zmqstream.ZMQStream)
client = Instance('IPython.parallel.Client')
# internals
shell_streams = List()
compiler = Instance(CommandCompiler, (), {})
completer = Instance(KernelCompleter)
aborted = Set()
shell_handlers = Dict()
control_handlers = Dict()
def _set_prefix(self):
self.prefix = "engine.%s"%self.int_id
def _connect_completer(self):
self.completer = KernelCompleter(self.user_ns)
def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)
self._set_prefix()
self._connect_completer()
self.on_trait_change(self._set_prefix, 'id')
self.on_trait_change(self._connect_completer, 'user_ns')
# Build dict of handlers for message types
for msg_type in ['execute_request', 'complete_request', 'apply_request',
'clear_request']:
self.shell_handlers[msg_type] = getattr(self, msg_type)
for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
self.control_handlers[msg_type] = getattr(self, msg_type)
self._initial_exec_lines()
def _wrap_exception(self, method=None):
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
content=wrap_exception(e_info)
return content
def _initial_exec_lines(self):
s = _Passer()
content = dict(silent=True, user_variable=[],user_expressions=[])
for line in self.exec_lines:
self.log.debug("executing initialization: %s"%line)
content.update({'code':line})
msg = self.session.msg('execute_request', content)
self.execute_request(s, [], msg)
#-------------------- control handlers -----------------------------
def abort_queues(self):
for stream in self.shell_streams:
if stream:
self.abort_queue(stream)
def abort_queue(self, stream):
while True:
idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True)
if msg is None:
return
self.log.info("Aborting:")
self.log.info(str(msg))
msg_type = msg['header']['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
# self.reply_socket.send(ident,zmq.SNDMORE)
# self.reply_socket.send_json(reply_msg)
reply_msg = self.session.send(stream, reply_type,
content={'status' : 'aborted'}, parent=msg, ident=idents)
self.log.debug(str(reply_msg))
# We need to wait a bit for requests to come in. This can probably
# be set shorter for true asynchronous clients.
time.sleep(0.05)
def abort_request(self, stream, ident, parent):
"""abort a specifig msg by id"""
msg_ids = parent['content'].get('msg_ids', None)
if isinstance(msg_ids, basestring):
msg_ids = [msg_ids]
if not msg_ids:
self.abort_queues()
for mid in msg_ids:
self.aborted.add(str(mid))
content = dict(status='ok')
reply_msg = self.session.send(stream, 'abort_reply', content=content,
parent=parent, ident=ident)
self.log.debug(str(reply_msg))
def shutdown_request(self, stream, ident, parent):
"""kill ourself. This should really be handled in an external process"""
try:
self.abort_queues()
except:
content = self._wrap_exception('shutdown')
else:
content = dict(parent['content'])
content['status'] = 'ok'
msg = self.session.send(stream, 'shutdown_reply',
content=content, parent=parent, ident=ident)
self.log.debug(str(msg))
dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
dc.start()
def dispatch_control(self, msg):
idents,msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.unserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
else:
self.log.debug("Control received, %s", msg)
header = msg['header']
msg_id = header['msg_id']
msg_type = header['msg_type']
handler = self.control_handlers.get(msg_type, None)
if handler is None:
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg_type)
else:
handler(self.control_stream, idents, msg)
#-------------------- queue helpers ------------------------------
def check_dependencies(self, dependencies):
if not dependencies:
return True
if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
anyorall = dependencies[0]
dependencies = dependencies[1]
else:
anyorall = 'all'
results = self.client.get_results(dependencies,status_only=True)
if results['status'] != 'ok':
return False
if anyorall == 'any':
if not results['completed']:
return False
else:
if results['pending']:
return False
return True
def check_aborted(self, msg_id):
return msg_id in self.aborted
#-------------------- queue handlers -----------------------------
def clear_request(self, stream, idents, parent):
"""Clear our namespace."""
self.user_ns = {}
msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
content = dict(status='ok'))
self._initial_exec_lines()
def execute_request(self, stream, ident, parent):
self.log.debug('execute request %s'%parent)
try:
code = parent[u'content'][u'code']
except:
self.log.error("Got bad msg: %s"%parent, exc_info=True)
return
self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
ident=asbytes('%s.pyin'%self.prefix))
started = datetime.now()
try:
comp_code = self.compiler(code, '<zmq-kernel>')
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
exec comp_code in self.user_ns, self.user_ns
except:
exc_content = self._wrap_exception('execute')
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
ident=asbytes('%s.pyerr'%self.prefix))
reply_content = exc_content
else:
reply_content = {'status' : 'ok'}
reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
ident=ident, subheader = dict(started=started))
self.log.debug(str(reply_msg))
if reply_msg['content']['status'] == u'error':
self.abort_queues()
def complete_request(self, stream, ident, parent):
matches = {'matches' : self.complete(parent),
'status' : 'ok'}
completion_msg = self.session.send(stream, 'complete_reply',
matches, parent, ident)
# print >> sys.__stdout__, completion_msg
def complete(self, msg):
return self.completer.complete(msg.content.line, msg.content.text)
def apply_request(self, stream, ident, parent):
# flush previous reply, so this request won't block it
stream.flush(zmq.POLLOUT)
try:
content = parent[u'content']
bufs = parent[u'buffers']
msg_id = parent['header']['msg_id']
# bound = parent['header'].get('bound', False)
except:
self.log.error("Got bad msg: %s"%parent, exc_info=True)
return
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
# self.iopub_stream.send(pyin_msg)
# self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
sub = {'dependencies_met' : True, 'engine' : self.ident,
'started': datetime.now()}
try:
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
# exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
working = self.user_ns
# suffix =
prefix = "_"+str(msg_id).replace("-","")+"_"
f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
# if bound:
# bound_ns = Namespace(working)
# args = [bound_ns]+list(args)
fname = getattr(f, '__name__', 'f')
fname = prefix+"f"
argname = prefix+"args"
kwargname = prefix+"kwargs"
resultname = prefix+"result"
ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
# print ns
working.update(ns)
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
try:
exec code in working,working
result = working.get(resultname)
finally:
for key in ns.iterkeys():
working.pop(key)
# if bound:
# working.update(bound_ns)
packed_result,buf = serialize_object(result)
result_buf = [packed_result]+buf
except:
exc_content = self._wrap_exception('apply')
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
ident=asbytes('%s.pyerr'%self.prefix))
reply_content = exc_content
result_buf = []
if exc_content['ename'] == 'UnmetDependency':
sub['dependencies_met'] = False
else:
reply_content = {'status' : 'ok'}
# put 'ok'/'error' status in header, for scheduler introspection:
sub['status'] = reply_content['status']
reply_msg = self.session.send(stream, u'apply_reply', reply_content,
parent=parent, ident=ident,buffers=result_buf, subheader=sub)
# flush i/o
# should this be before reply_msg is sent, like in the single-kernel code,
# or should nothing get in the way of real results?
sys.stdout.flush()
sys.stderr.flush()
def dispatch_queue(self, stream, msg):
self.control_stream.flush()
idents,msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.unserialize(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
else:
self.log.debug("Message received, %s", msg)
header = msg['header']
msg_id = header['msg_id']
msg_type = msg['header']['msg_type']
if self.check_aborted(msg_id):
self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
reply_type = msg_type.split('_')[0] + '_reply'
status = {'status' : 'aborted'}
reply_msg = self.session.send(stream, reply_type, subheader=status,
content=status, parent=msg, ident=idents)
return
handler = self.shell_handlers.get(msg_type, None)
if handler is None:
self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg_type)
else:
handler(stream, idents, msg)
def start(self):
#### stream mode:
if self.control_stream:
self.control_stream.on_recv(self.dispatch_control, copy=False)
self.control_stream.on_err(printer)
def make_dispatcher(stream):
def dispatcher(msg):
return self.dispatch_queue(stream, msg)
return dispatcher
for s in self.shell_streams:
s.on_recv(make_dispatcher(s), copy=False)
s.on_err(printer)
if self.iopub_stream:
self.iopub_stream.on_err(printer)
#### while True mode:
# while True:
# idle = True
# try:
# msg = self.shell_stream.socket.recv_multipart(
# zmq.NOBLOCK, copy=False)
# except zmq.ZMQError, e:
# if e.errno != zmq.EAGAIN:
# raise e
# else:
# idle=False
# self.dispatch_queue(self.shell_stream, msg)
#
# if not self.task_stream.empty():
# idle=False
# msg = self.task_stream.recv_multipart()
# self.dispatch_queue(self.task_stream, msg)
# if idle:
# # don't busywait
# time.sleep(1e-3)