#!/usr/bin/env python """ Kernel adapted from kernel.py to use ZMQ Streams """ #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- # Standard library imports. from __future__ import print_function import __builtin__ from code import CommandCompiler import os import sys import time import traceback from datetime import datetime from signal import SIGTERM, SIGKILL from pprint import pprint # System library imports. import zmq from zmq.eventloop import ioloop, zmqstream # Local imports. from IPython.core import ultratb from IPython.utils.traitlets import HasTraits, Instance, List from IPython.zmq.completer import KernelCompleter from IPython.zmq.log import logger # a Logger object from IPython.zmq.iostream import OutStream from IPython.zmq.displayhook import DisplayHook from streamsession import StreamSession, Message, extract_header, serialize_object,\ unpack_apply_message, ISO8601, wrap_exception from dependency import UnmetDependency import heartmonitor from client import Client def printer(*args): pprint(args, stream=sys.__stdout__) #----------------------------------------------------------------------------- # Main kernel class #----------------------------------------------------------------------------- class Kernel(HasTraits): #--------------------------------------------------------------------------- # Kernel interface #--------------------------------------------------------------------------- session = Instance(StreamSession) shell_streams = Instance(list) control_stream = Instance(zmqstream.ZMQStream) task_stream = Instance(zmqstream.ZMQStream) iopub_stream = Instance(zmqstream.ZMQStream) client = Instance(Client) loop = Instance(ioloop.IOLoop) def __init__(self, **kwargs): super(Kernel, self).__init__(**kwargs) self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) self.prefix = 'engine.%s'%self.identity self.user_ns = {} self.history = [] self.compiler = CommandCompiler() self.completer = KernelCompleter(self.user_ns) self.aborted = set() # Build dict of handlers for message types self.shell_handlers = {} self.control_handlers = {} for msg_type in ['execute_request', 'complete_request', 'apply_request', 'clear_request']: self.shell_handlers[msg_type] = getattr(self, msg_type) for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): self.control_handlers[msg_type] = getattr(self, msg_type) def _wrap_exception(self, method=None): e_info = dict(engineid=self.identity, method=method) content=wrap_exception(e_info) return content #-------------------- control handlers ----------------------------- def abort_queues(self): for stream in self.shell_streams: if stream: self.abort_queue(stream) def abort_queue(self, stream): while True: try: msg = self.session.recv(stream, zmq.NOBLOCK,content=True) except zmq.ZMQError as e: if e.errno == zmq.EAGAIN: break else: return else: if msg is None: return else: idents,msg = msg # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." # msg = self.reply_socket.recv_json() print ("Aborting:", file=sys.__stdout__) print (Message(msg), file=sys.__stdout__) msg_type = msg['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) # self.reply_socket.send(ident,zmq.SNDMORE) # self.reply_socket.send_json(reply_msg) reply_msg = self.session.send(stream, reply_type, content={'status' : 'aborted'}, parent=msg, ident=idents)[0] print(Message(reply_msg), file=sys.__stdout__) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. time.sleep(0.05) def abort_request(self, stream, ident, parent): """abort a specifig msg by id""" msg_ids = parent['content'].get('msg_ids', None) if isinstance(msg_ids, basestring): msg_ids = [msg_ids] if not msg_ids: self.abort_queues() for mid in msg_ids: self.aborted.add(str(mid)) content = dict(status='ok') reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent, ident=ident)[0] print(Message(reply_msg), file=sys.__stdout__) def shutdown_request(self, stream, ident, parent): """kill ourself. This should really be handled in an external process""" try: self.abort_queues() except: content = self._wrap_exception('shutdown') else: content = dict(parent['content']) content['status'] = 'ok' msg = self.session.send(stream, 'shutdown_reply', content=content, parent=parent, ident=ident) # msg = self.session.send(self.pub_socket, 'shutdown_reply', # content, parent, ident) # print >> sys.__stdout__, msg # time.sleep(0.2) dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop) dc.start() def dispatch_control(self, msg): idents,msg = self.session.feed_identities(msg, copy=False) try: msg = self.session.unpack_message(msg, content=True, copy=False) except: logger.error("Invalid Message", exc_info=True) return header = msg['header'] msg_id = header['msg_id'] handler = self.control_handlers.get(msg['msg_type'], None) if handler is None: print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__) else: handler(self.control_stream, idents, msg) #-------------------- queue helpers ------------------------------ def check_dependencies(self, dependencies): if not dependencies: return True if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): anyorall = dependencies[0] dependencies = dependencies[1] else: anyorall = 'all' results = self.client.get_results(dependencies,status_only=True) if results['status'] != 'ok': return False if anyorall == 'any': if not results['completed']: return False else: if results['pending']: return False return True def check_aborted(self, msg_id): return msg_id in self.aborted #-------------------- queue handlers ----------------------------- def clear_request(self, stream, idents, parent): """Clear our namespace.""" self.user_ns = {} msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, content = dict(status='ok')) def execute_request(self, stream, ident, parent): try: code = parent[u'content'][u'code'] except: print("Got bad msg: ", file=sys.__stderr__) print(Message(parent), file=sys.__stderr__) return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.iopub_stream.send(pyin_msg) self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent, ident=self.identity+'.pyin') started = datetime.now().strftime(ISO8601) try: comp_code = self.compiler(code, '') # allow for not overriding displayhook if hasattr(sys.displayhook, 'set_parent'): sys.displayhook.set_parent(parent) sys.stdout.set_parent(parent) sys.stderr.set_parent(parent) exec comp_code in self.user_ns, self.user_ns except: exc_content = self._wrap_exception('execute') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, ident=self.identity+'.pyerr') reply_content = exc_content else: reply_content = {'status' : 'ok'} # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) # self.reply_socket.send(ident, zmq.SNDMORE) # self.reply_socket.send_json(reply_msg) reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident, subheader = dict(started=started)) print(Message(reply_msg), file=sys.__stdout__) if reply_msg['content']['status'] == u'error': self.abort_queues() def complete_request(self, stream, ident, parent): matches = {'matches' : self.complete(parent), 'status' : 'ok'} completion_msg = self.session.send(stream, 'complete_reply', matches, parent, ident) # print >> sys.__stdout__, completion_msg def complete(self, msg): return self.completer.complete(msg.content.line, msg.content.text) def apply_request(self, stream, ident, parent): # print (parent) try: content = parent[u'content'] bufs = parent[u'buffers'] msg_id = parent['header']['msg_id'] bound = content.get('bound', False) except: print("Got bad msg: ", file=sys.__stderr__) print(Message(parent), file=sys.__stderr__) return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) # self.iopub_stream.send(pyin_msg) # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) sub = {'dependencies_met' : True, 'engine' : self.identity, 'started': datetime.now().strftime(ISO8601)} try: # allow for not overriding displayhook if hasattr(sys.displayhook, 'set_parent'): sys.displayhook.set_parent(parent) sys.stdout.set_parent(parent) sys.stderr.set_parent(parent) # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns if bound: working = self.user_ns suffix = str(msg_id).replace("-","") prefix = "_" else: working = dict() suffix = prefix = "_" # prevent keyword collisions with lambda f,args,kwargs = unpack_apply_message(bufs, working, copy=False) # if f.fun if hasattr(f, 'func_name'): fname = f.func_name else: fname = f.__name__ fname = prefix+fname.strip('<>')+suffix argname = prefix+"args"+suffix kwargname = prefix+"kwargs"+suffix resultname = prefix+"result"+suffix ns = { fname : f, argname : args, kwargname : kwargs } # print ns working.update(ns) code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) exec code in working, working result = working.get(resultname) # clear the namespace if bound: for key in ns.iterkeys(): self.user_ns.pop(key) else: del working packed_result,buf = serialize_object(result) result_buf = [packed_result]+buf except: exc_content = self._wrap_exception('apply') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, ident=self.identity+'.pyerr') reply_content = exc_content result_buf = [] if exc_content['ename'] == UnmetDependency.__name__: sub['dependencies_met'] = False else: reply_content = {'status' : 'ok'} # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) # self.reply_socket.send(ident, zmq.SNDMORE) # self.reply_socket.send_json(reply_msg) reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf, subheader=sub) # print(Message(reply_msg), file=sys.__stdout__) # if reply_msg['content']['status'] == u'error': # self.abort_queues() def dispatch_queue(self, stream, msg): self.control_stream.flush() idents,msg = self.session.feed_identities(msg, copy=False) try: msg = self.session.unpack_message(msg, content=True, copy=False) except: logger.error("Invalid Message", exc_info=True) return header = msg['header'] msg_id = header['msg_id'] if self.check_aborted(msg_id): self.aborted.remove(msg_id) # is it safe to assume a msg_id will not be resubmitted? reply_type = msg['msg_type'].split('_')[0] + '_reply' reply_msg = self.session.send(stream, reply_type, content={'status' : 'aborted'}, parent=msg, ident=idents) return handler = self.shell_handlers.get(msg['msg_type'], None) if handler is None: print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) else: handler(stream, idents, msg) def start(self): #### stream mode: if self.control_stream: self.control_stream.on_recv(self.dispatch_control, copy=False) self.control_stream.on_err(printer) def make_dispatcher(stream): def dispatcher(msg): return self.dispatch_queue(stream, msg) return dispatcher for s in self.shell_streams: s.on_recv(make_dispatcher(s), copy=False) s.on_err(printer) if self.iopub_stream: self.iopub_stream.on_err(printer) # self.iopub_stream.on_send(printer) #### while True mode: # while True: # idle = True # try: # msg = self.shell_stream.socket.recv_multipart( # zmq.NOBLOCK, copy=False) # except zmq.ZMQError, e: # if e.errno != zmq.EAGAIN: # raise e # else: # idle=False # self.dispatch_queue(self.shell_stream, msg) # # if not self.task_stream.empty(): # idle=False # msg = self.task_stream.recv_multipart() # self.dispatch_queue(self.task_stream, msg) # if idle: # # don't busywait # time.sleep(1e-3) def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, client_addr=None, loop=None, context=None, key=None, out_stream_factory=OutStream, display_hook_factory=DisplayHook): # create loop, context, and session: if loop is None: loop = ioloop.IOLoop.instance() if context is None: context = zmq.Context() c = context session = StreamSession(key=key) # print (session.key) print (control_addr, shell_addrs, iopub_addr, hb_addrs) # create Control Stream control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) control_stream.setsockopt(zmq.IDENTITY, identity) control_stream.connect(control_addr) # create Shell Streams (MUX, Task, etc.): shell_streams = [] for addr in shell_addrs: stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) stream.setsockopt(zmq.IDENTITY, identity) stream.connect(addr) shell_streams.append(stream) # create iopub stream: iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop) iopub_stream.setsockopt(zmq.IDENTITY, identity) iopub_stream.connect(iopub_addr) # Redirect input streams and set a display hook. if out_stream_factory: sys.stdout = out_stream_factory(session, iopub_stream, u'stdout') sys.stdout.topic = identity+'.stdout' sys.stderr = out_stream_factory(session, iopub_stream, u'stderr') sys.stderr.topic = identity+'.stderr' if display_hook_factory: sys.displayhook = display_hook_factory(session, iopub_stream) sys.displayhook.topic = identity+'.pyout' # launch heartbeat heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) heart.start() # create (optional) Client if client_addr: client = Client(client_addr, username=identity) else: client = None kernel = Kernel(session=session, control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, client=client, loop=loop) kernel.start() return loop, c, kernel