##// END OF EJS Templates
API update involving map and load-balancing
API update involving map and load-balancing

File last commit:

r3631:d72427c6
r3635:498a93f1
Show More
streamkernel.py
487 lines | 17.9 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""
Kernel adapted from kernel.py to use ZMQ Streams
"""
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# Standard library imports.
from __future__ import print_function
import __builtin__
import logging
import os
import sys
import time
import traceback
from code import CommandCompiler
from datetime import datetime
from pprint import pprint
from signal import SIGTERM, SIGKILL
# System library imports.
import zmq
from zmq.eventloop import ioloop, zmqstream
# Local imports.
from IPython.core import ultratb
from IPython.utils.traitlets import HasTraits, Instance, List, Int, Dict, Set, Str
from IPython.zmq.completer import KernelCompleter
from IPython.zmq.iostream import OutStream
from IPython.zmq.displayhook import DisplayHook
import heartmonitor
from client import Client
from factory import SessionFactory
from streamsession import StreamSession, Message, extract_header, serialize_object,\
unpack_apply_message, ISO8601, wrap_exception
def printer(*args):
pprint(args, stream=sys.__stdout__)
class _Passer:
"""Empty class that implements `send()` that does nothing."""
def send(self, *args, **kwargs):
pass
send_multipart = send
#-----------------------------------------------------------------------------
# Main kernel class
#-----------------------------------------------------------------------------
class Kernel(SessionFactory):
#---------------------------------------------------------------------------
# Kernel interface
#---------------------------------------------------------------------------
# kwargs:
int_id = Int(-1, config=True)
user_ns = Dict(config=True)
exec_lines = List(config=True)
control_stream = Instance(zmqstream.ZMQStream)
task_stream = Instance(zmqstream.ZMQStream)
iopub_stream = Instance(zmqstream.ZMQStream)
client = Instance('IPython.zmq.parallel.client.Client')
# internals
shell_streams = List()
compiler = Instance(CommandCompiler, (), {})
completer = Instance(KernelCompleter)
aborted = Set()
shell_handlers = Dict()
control_handlers = Dict()
def _set_prefix(self):
self.prefix = "engine.%s"%self.int_id
def _connect_completer(self):
self.completer = KernelCompleter(self.user_ns)
def __init__(self, **kwargs):
super(Kernel, self).__init__(**kwargs)
self._set_prefix()
self._connect_completer()
self.on_trait_change(self._set_prefix, 'id')
self.on_trait_change(self._connect_completer, 'user_ns')
# Build dict of handlers for message types
for msg_type in ['execute_request', 'complete_request', 'apply_request',
'clear_request']:
self.shell_handlers[msg_type] = getattr(self, msg_type)
for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
self.control_handlers[msg_type] = getattr(self, msg_type)
self._initial_exec_lines()
def _wrap_exception(self, method=None):
e_info = dict(engineid=self.ident, method=method)
content=wrap_exception(e_info)
return content
def _initial_exec_lines(self):
s = _Passer()
content = dict(silent=True, user_variable=[],user_expressions=[])
for line in self.exec_lines:
self.log.debug("executing initialization: %s"%line)
content.update({'code':line})
msg = self.session.msg('execute_request', content)
self.execute_request(s, [], msg)
#-------------------- control handlers -----------------------------
def abort_queues(self):
for stream in self.shell_streams:
if stream:
self.abort_queue(stream)
def abort_queue(self, stream):
while True:
try:
msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
break
else:
return
else:
if msg is None:
return
else:
idents,msg = msg
# assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
# msg = self.reply_socket.recv_json()
self.log.info("Aborting:")
self.log.info(str(msg))
msg_type = msg['msg_type']
reply_type = msg_type.split('_')[0] + '_reply'
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
# self.reply_socket.send(ident,zmq.SNDMORE)
# self.reply_socket.send_json(reply_msg)
reply_msg = self.session.send(stream, reply_type,
content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
self.log.debug(str(reply_msg))
# We need to wait a bit for requests to come in. This can probably
# be set shorter for true asynchronous clients.
time.sleep(0.05)
def abort_request(self, stream, ident, parent):
"""abort a specifig msg by id"""
msg_ids = parent['content'].get('msg_ids', None)
if isinstance(msg_ids, basestring):
msg_ids = [msg_ids]
if not msg_ids:
self.abort_queues()
for mid in msg_ids:
self.aborted.add(str(mid))
content = dict(status='ok')
reply_msg = self.session.send(stream, 'abort_reply', content=content,
parent=parent, ident=ident)[0]
self.log.debug(str(reply_msg))
def shutdown_request(self, stream, ident, parent):
"""kill ourself. This should really be handled in an external process"""
try:
self.abort_queues()
except:
content = self._wrap_exception('shutdown')
else:
content = dict(parent['content'])
content['status'] = 'ok'
msg = self.session.send(stream, 'shutdown_reply',
content=content, parent=parent, ident=ident)
# msg = self.session.send(self.pub_socket, 'shutdown_reply',
# content, parent, ident)
# print >> sys.__stdout__, msg
# time.sleep(0.2)
dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
dc.start()
def dispatch_control(self, msg):
idents,msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.unpack_message(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
header = msg['header']
msg_id = header['msg_id']
handler = self.control_handlers.get(msg['msg_type'], None)
if handler is None:
self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
else:
handler(self.control_stream, idents, msg)
#-------------------- queue helpers ------------------------------
def check_dependencies(self, dependencies):
if not dependencies:
return True
if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
anyorall = dependencies[0]
dependencies = dependencies[1]
else:
anyorall = 'all'
results = self.client.get_results(dependencies,status_only=True)
if results['status'] != 'ok':
return False
if anyorall == 'any':
if not results['completed']:
return False
else:
if results['pending']:
return False
return True
def check_aborted(self, msg_id):
return msg_id in self.aborted
#-------------------- queue handlers -----------------------------
def clear_request(self, stream, idents, parent):
"""Clear our namespace."""
self.user_ns = {}
msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
content = dict(status='ok'))
self._initial_exec_lines()
def execute_request(self, stream, ident, parent):
self.log.debug('execute request %s'%parent)
try:
code = parent[u'content'][u'code']
except:
self.log.error("Got bad msg: %s"%parent, exc_info=True)
return
self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
ident='%s.pyin'%self.prefix)
started = datetime.now().strftime(ISO8601)
try:
comp_code = self.compiler(code, '<zmq-kernel>')
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
exec comp_code in self.user_ns, self.user_ns
except:
exc_content = self._wrap_exception('execute')
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
ident='%s.pyerr'%self.prefix)
reply_content = exc_content
else:
reply_content = {'status' : 'ok'}
reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
ident=ident, subheader = dict(started=started))
self.log.debug(str(reply_msg))
if reply_msg['content']['status'] == u'error':
self.abort_queues()
def complete_request(self, stream, ident, parent):
matches = {'matches' : self.complete(parent),
'status' : 'ok'}
completion_msg = self.session.send(stream, 'complete_reply',
matches, parent, ident)
# print >> sys.__stdout__, completion_msg
def complete(self, msg):
return self.completer.complete(msg.content.line, msg.content.text)
def apply_request(self, stream, ident, parent):
# flush previous reply, so this request won't block it
stream.flush(zmq.POLLOUT)
try:
content = parent[u'content']
bufs = parent[u'buffers']
msg_id = parent['header']['msg_id']
bound = content.get('bound', False)
except:
self.log.error("Got bad msg: %s"%parent, exc_info=True)
return
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
# self.iopub_stream.send(pyin_msg)
# self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
sub = {'dependencies_met' : True, 'engine' : self.ident,
'started': datetime.now().strftime(ISO8601)}
try:
# allow for not overriding displayhook
if hasattr(sys.displayhook, 'set_parent'):
sys.displayhook.set_parent(parent)
sys.stdout.set_parent(parent)
sys.stderr.set_parent(parent)
# exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
if bound:
working = self.user_ns
suffix = str(msg_id).replace("-","")
prefix = "_"
else:
working = dict()
suffix = prefix = "_" # prevent keyword collisions with lambda
f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
# if f.fun
fname = getattr(f, '__name__', 'f')
fname = prefix+fname.strip('<>')+suffix
argname = prefix+"args"+suffix
kwargname = prefix+"kwargs"+suffix
resultname = prefix+"result"+suffix
ns = { fname : f, argname : args, kwargname : kwargs }
# print ns
working.update(ns)
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
exec code in working, working
result = working.get(resultname)
# clear the namespace
if bound:
for key in ns.iterkeys():
self.user_ns.pop(key)
else:
del working
packed_result,buf = serialize_object(result)
result_buf = [packed_result]+buf
except:
exc_content = self._wrap_exception('apply')
# exc_msg = self.session.msg(u'pyerr', exc_content, parent)
self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
ident='%s.pyerr'%self.prefix)
reply_content = exc_content
result_buf = []
if exc_content['ename'] == 'UnmetDependency':
sub['dependencies_met'] = False
else:
reply_content = {'status' : 'ok'}
# put 'ok'/'error' status in header, for scheduler introspection:
sub['status'] = reply_content['status']
reply_msg = self.session.send(stream, u'apply_reply', reply_content,
parent=parent, ident=ident,buffers=result_buf, subheader=sub)
# if reply_msg['content']['status'] == u'error':
# self.abort_queues()
def dispatch_queue(self, stream, msg):
self.control_stream.flush()
idents,msg = self.session.feed_identities(msg, copy=False)
try:
msg = self.session.unpack_message(msg, content=True, copy=False)
except:
self.log.error("Invalid Message", exc_info=True)
return
header = msg['header']
msg_id = header['msg_id']
if self.check_aborted(msg_id):
self.aborted.remove(msg_id)
# is it safe to assume a msg_id will not be resubmitted?
reply_type = msg['msg_type'].split('_')[0] + '_reply'
reply_msg = self.session.send(stream, reply_type,
content={'status' : 'aborted'}, parent=msg, ident=idents)
return
handler = self.shell_handlers.get(msg['msg_type'], None)
if handler is None:
self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
else:
handler(stream, idents, msg)
def start(self):
#### stream mode:
if self.control_stream:
self.control_stream.on_recv(self.dispatch_control, copy=False)
self.control_stream.on_err(printer)
def make_dispatcher(stream):
def dispatcher(msg):
return self.dispatch_queue(stream, msg)
return dispatcher
for s in self.shell_streams:
s.on_recv(make_dispatcher(s), copy=False)
s.on_err(printer)
if self.iopub_stream:
self.iopub_stream.on_err(printer)
#### while True mode:
# while True:
# idle = True
# try:
# msg = self.shell_stream.socket.recv_multipart(
# zmq.NOBLOCK, copy=False)
# except zmq.ZMQError, e:
# if e.errno != zmq.EAGAIN:
# raise e
# else:
# idle=False
# self.dispatch_queue(self.shell_stream, msg)
#
# if not self.task_stream.empty():
# idle=False
# msg = self.task_stream.recv_multipart()
# self.dispatch_queue(self.task_stream, msg)
# if idle:
# # don't busywait
# time.sleep(1e-3)
def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
client_addr=None, loop=None, context=None, key=None,
out_stream_factory=OutStream, display_hook_factory=DisplayHook):
"""NO LONGER IN USE"""
# create loop, context, and session:
if loop is None:
loop = ioloop.IOLoop.instance()
if context is None:
context = zmq.Context()
c = context
session = StreamSession(key=key)
# print (session.key)
# print (control_addr, shell_addrs, iopub_addr, hb_addrs)
# create Control Stream
control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
control_stream.setsockopt(zmq.IDENTITY, identity)
control_stream.connect(control_addr)
# create Shell Streams (MUX, Task, etc.):
shell_streams = []
for addr in shell_addrs:
stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
stream.setsockopt(zmq.IDENTITY, identity)
stream.connect(addr)
shell_streams.append(stream)
# create iopub stream:
iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop)
iopub_stream.setsockopt(zmq.IDENTITY, identity)
iopub_stream.connect(iopub_addr)
# Redirect input streams and set a display hook.
if out_stream_factory:
sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
sys.stdout.topic = 'engine.%i.stdout'%int_id
sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
sys.stderr.topic = 'engine.%i.stderr'%int_id
if display_hook_factory:
sys.displayhook = display_hook_factory(session, iopub_stream)
sys.displayhook.topic = 'engine.%i.pyout'%int_id
# launch heartbeat
heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
heart.start()
# create (optional) Client
if client_addr:
client = Client(client_addr, username=identity)
else:
client = None
kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
shell_streams=shell_streams, iopub_stream=iopub_stream,
client=client, loop=loop)
kernel.start()
return loop, c, kernel