streamkernel.py
498 lines
| 17.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
""" | ||||
Kernel adapted from kernel.py to use ZMQ Streams | ||||
""" | ||||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3539 | import __builtin__ | ||
MinRK
|
r3540 | import os | ||
MinRK
|
r3539 | import sys | ||
import time | ||||
import traceback | ||||
from signal import SIGTERM, SIGKILL | ||||
MinRK
|
r3540 | from pprint import pprint | ||
MinRK
|
r3539 | |||
from code import CommandCompiler | ||||
import zmq | ||||
from zmq.eventloop import ioloop, zmqstream | ||||
MinRK
|
r3546 | from IPython.zmq.completer import KernelCompleter | ||
MinRK
|
r3539 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | ||
unpack_apply_message | ||||
MinRK
|
r3546 | from dependency import UnmetDependency | ||
MinRK
|
r3539 | |||
MinRK
|
r3540 | def printer(*args): | ||
pprint(args) | ||||
MinRK
|
r3539 | class OutStream(object): | ||
"""A file like object that publishes the stream to a 0MQ PUB socket.""" | ||||
def __init__(self, session, pub_socket, name, max_buffer=200): | ||||
self.session = session | ||||
self.pub_socket = pub_socket | ||||
self.name = name | ||||
self._buffer = [] | ||||
self._buffer_len = 0 | ||||
self.max_buffer = max_buffer | ||||
self.parent_header = {} | ||||
def set_parent(self, parent): | ||||
self.parent_header = extract_header(parent) | ||||
def close(self): | ||||
self.pub_socket = None | ||||
def flush(self): | ||||
if self.pub_socket is None: | ||||
raise ValueError(u'I/O operation on closed file') | ||||
else: | ||||
if self._buffer: | ||||
data = ''.join(self._buffer) | ||||
content = {u'name':self.name, u'data':data} | ||||
# msg = self.session.msg(u'stream', content=content, | ||||
# parent=self.parent_header) | ||||
msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) | ||||
# print>>sys.__stdout__, Message(msg) | ||||
# self.pub_socket.send_json(msg) | ||||
self._buffer_len = 0 | ||||
self._buffer = [] | ||||
def isattr(self): | ||||
return False | ||||
def next(self): | ||||
raise IOError('Read not supported on a write only stream.') | ||||
def read(self, size=None): | ||||
raise IOError('Read not supported on a write only stream.') | ||||
readline=read | ||||
def write(self, s): | ||||
if self.pub_socket is None: | ||||
raise ValueError('I/O operation on closed file') | ||||
else: | ||||
self._buffer.append(s) | ||||
self._buffer_len += len(s) | ||||
self._maybe_send() | ||||
def _maybe_send(self): | ||||
if '\n' in self._buffer[-1]: | ||||
self.flush() | ||||
if self._buffer_len > self.max_buffer: | ||||
self.flush() | ||||
def writelines(self, sequence): | ||||
if self.pub_socket is None: | ||||
raise ValueError('I/O operation on closed file') | ||||
else: | ||||
for s in sequence: | ||||
self.write(s) | ||||
class DisplayHook(object): | ||||
def __init__(self, session, pub_socket): | ||||
self.session = session | ||||
self.pub_socket = pub_socket | ||||
self.parent_header = {} | ||||
def __call__(self, obj): | ||||
if obj is None: | ||||
return | ||||
__builtin__._ = obj | ||||
# msg = self.session.msg(u'pyout', {u'data':repr(obj)}, | ||||
# parent=self.parent_header) | ||||
# self.pub_socket.send_json(msg) | ||||
self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header) | ||||
def set_parent(self, parent): | ||||
self.parent_header = extract_header(parent) | ||||
class RawInput(object): | ||||
def __init__(self, session, socket): | ||||
self.session = session | ||||
self.socket = socket | ||||
def __call__(self, prompt=None): | ||||
msg = self.session.msg(u'raw_input') | ||||
self.socket.send_json(msg) | ||||
while True: | ||||
try: | ||||
reply = self.socket.recv_json(zmq.NOBLOCK) | ||||
MinRK
|
r3556 | except zmq.ZMQError as e: | ||
MinRK
|
r3539 | if e.errno == zmq.EAGAIN: | ||
pass | ||||
else: | ||||
raise | ||||
else: | ||||
break | ||||
return reply[u'content'][u'data'] | ||||
class Kernel(object): | ||||
def __init__(self, session, control_stream, reply_stream, pub_stream, | ||||
task_stream=None, client=None): | ||||
self.session = session | ||||
self.control_stream = control_stream | ||||
MinRK
|
r3541 | # self.control_socket = control_stream.socket | ||
MinRK
|
r3539 | self.reply_stream = reply_stream | ||
self.task_stream = task_stream | ||||
self.pub_stream = pub_stream | ||||
self.client = client | ||||
self.user_ns = {} | ||||
self.history = [] | ||||
self.compiler = CommandCompiler() | ||||
self.completer = KernelCompleter(self.user_ns) | ||||
self.aborted = set() | ||||
# Build dict of handlers for message types | ||||
self.queue_handlers = {} | ||||
self.control_handlers = {} | ||||
for msg_type in ['execute_request', 'complete_request', 'apply_request']: | ||||
self.queue_handlers[msg_type] = getattr(self, msg_type) | ||||
MinRK
|
r3546 | for msg_type in ['kill_request', 'abort_request']+self.queue_handlers.keys(): | ||
MinRK
|
r3539 | self.control_handlers[msg_type] = getattr(self, msg_type) | ||
#-------------------- control handlers ----------------------------- | ||||
MinRK
|
r3540 | def abort_queues(self): | ||
for stream in (self.task_stream, self.reply_stream): | ||||
if stream: | ||||
self.abort_queue(stream) | ||||
MinRK
|
r3539 | |||
def abort_queue(self, stream): | ||||
while True: | ||||
try: | ||||
msg = self.session.recv(stream, zmq.NOBLOCK,content=True) | ||||
MinRK
|
r3556 | except zmq.ZMQError as e: | ||
MinRK
|
r3539 | if e.errno == zmq.EAGAIN: | ||
break | ||||
else: | ||||
return | ||||
else: | ||||
if msg is None: | ||||
return | ||||
else: | ||||
idents,msg = msg | ||||
# assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." | ||||
# msg = self.reply_socket.recv_json() | ||||
MinRK
|
r3553 | print ("Aborting:", file=sys.__stdout__) | ||
print (Message(msg), file=sys.__stdout__) | ||||
MinRK
|
r3539 | msg_type = msg['msg_type'] | ||
reply_type = msg_type.split('_')[0] + '_reply' | ||||
# reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | ||||
# self.reply_socket.send(ident,zmq.SNDMORE) | ||||
# self.reply_socket.send_json(reply_msg) | ||||
reply_msg = self.session.send(stream, reply_type, | ||||
MinRK
|
r3550 | content={'status' : 'aborted'}, parent=msg, ident=idents)[0] | ||
MinRK
|
r3553 | print(Message(reply_msg), file=sys.__stdout__) | ||
MinRK
|
r3539 | # We need to wait a bit for requests to come in. This can probably | ||
# be set shorter for true asynchronous clients. | ||||
time.sleep(0.05) | ||||
def abort_request(self, stream, ident, parent): | ||||
MinRK
|
r3540 | """abort a specifig msg by id""" | ||
MinRK
|
r3539 | msg_ids = parent['content'].get('msg_ids', None) | ||
MinRK
|
r3540 | if isinstance(msg_ids, basestring): | ||
msg_ids = [msg_ids] | ||||
MinRK
|
r3539 | if not msg_ids: | ||
MinRK
|
r3540 | self.abort_queues() | ||
MinRK
|
r3539 | for mid in msg_ids: | ||
MinRK
|
r3540 | self.aborted.add(str(mid)) | ||
MinRK
|
r3539 | |||
content = dict(status='ok') | ||||
MinRK
|
r3550 | reply_msg = self.session.send(stream, 'abort_reply', content=content, | ||
parent=parent, ident=ident)[0] | ||||
MinRK
|
r3553 | print(Message(reply_msg), file=sys.__stdout__) | ||
MinRK
|
r3539 | |||
def kill_request(self, stream, idents, parent): | ||||
MinRK
|
r3540 | """kill ourselves. This should really be handled in an external process""" | ||
self.abort_queues() | ||||
MinRK
|
r3539 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, | ||
content = dict(status='ok')) | ||||
# we can know that a message is done if we *don't* use streams, but | ||||
# use a socket directly with MessageTracker | ||||
MinRK
|
r3540 | time.sleep(.5) | ||
MinRK
|
r3539 | os.kill(os.getpid(), SIGTERM) | ||
MinRK
|
r3540 | time.sleep(1) | ||
MinRK
|
r3539 | os.kill(os.getpid(), SIGKILL) | ||
def dispatch_control(self, msg): | ||||
idents,msg = self.session.feed_identities(msg, copy=False) | ||||
msg = self.session.unpack_message(msg, content=True, copy=False) | ||||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
handler = self.control_handlers.get(msg['msg_type'], None) | ||||
if handler is None: | ||||
MinRK
|
r3553 | print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__) | ||
MinRK
|
r3539 | else: | ||
MinRK
|
r3540 | handler(self.control_stream, idents, msg) | ||
MinRK
|
r3539 | |||
#-------------------- queue helpers ------------------------------ | ||||
def check_dependencies(self, dependencies): | ||||
if not dependencies: | ||||
return True | ||||
if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): | ||||
anyorall = dependencies[0] | ||||
dependencies = dependencies[1] | ||||
else: | ||||
anyorall = 'all' | ||||
results = self.client.get_results(dependencies,status_only=True) | ||||
if results['status'] != 'ok': | ||||
return False | ||||
if anyorall == 'any': | ||||
if not results['completed']: | ||||
return False | ||||
else: | ||||
if results['pending']: | ||||
return False | ||||
return True | ||||
MinRK
|
r3540 | def check_aborted(self, msg_id): | ||
return msg_id in self.aborted | ||||
MinRK
|
r3539 | #-------------------- queue handlers ----------------------------- | ||
def execute_request(self, stream, ident, parent): | ||||
try: | ||||
code = parent[u'content'][u'code'] | ||||
except: | ||||
MinRK
|
r3553 | print("Got bad msg: ", file=sys.__stderr__) | ||
print(Message(parent), file=sys.__stderr__) | ||||
MinRK
|
r3539 | return | ||
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | ||||
# self.pub_stream.send(pyin_msg) | ||||
self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | ||||
try: | ||||
comp_code = self.compiler(code, '<zmq-kernel>') | ||||
# allow for not overriding displayhook | ||||
if hasattr(sys.displayhook, 'set_parent'): | ||||
sys.displayhook.set_parent(parent) | ||||
exec comp_code in self.user_ns, self.user_ns | ||||
except: | ||||
# result = u'error' | ||||
etype, evalue, tb = sys.exc_info() | ||||
tb = traceback.format_exception(etype, evalue, tb) | ||||
exc_content = { | ||||
u'status' : u'error', | ||||
u'traceback' : tb, | ||||
u'etype' : unicode(etype), | ||||
u'evalue' : unicode(evalue) | ||||
} | ||||
# exc_msg = self.session.msg(u'pyerr', exc_content, parent) | ||||
self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | ||||
reply_content = exc_content | ||||
else: | ||||
reply_content = {'status' : 'ok'} | ||||
# reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | ||||
# self.reply_socket.send(ident, zmq.SNDMORE) | ||||
# self.reply_socket.send_json(reply_msg) | ||||
reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) | ||||
MinRK
|
r3553 | print(Message(reply_msg), file=sys.__stdout__) | ||
MinRK
|
r3539 | if reply_msg['content']['status'] == u'error': | ||
MinRK
|
r3540 | self.abort_queues() | ||
MinRK
|
r3539 | |||
def complete_request(self, stream, ident, parent): | ||||
matches = {'matches' : self.complete(parent), | ||||
'status' : 'ok'} | ||||
completion_msg = self.session.send(stream, 'complete_reply', | ||||
matches, parent, ident) | ||||
# print >> sys.__stdout__, completion_msg | ||||
def complete(self, msg): | ||||
return self.completer.complete(msg.content.line, msg.content.text) | ||||
def apply_request(self, stream, ident, parent): | ||||
MinRK
|
r3553 | print (parent) | ||
MinRK
|
r3539 | try: | ||
content = parent[u'content'] | ||||
bufs = parent[u'buffers'] | ||||
msg_id = parent['header']['msg_id'] | ||||
bound = content.get('bound', False) | ||||
except: | ||||
MinRK
|
r3553 | print("Got bad msg: ", file=sys.__stderr__) | ||
print(Message(parent), file=sys.__stderr__) | ||||
MinRK
|
r3539 | return | ||
# pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | ||||
# self.pub_stream.send(pyin_msg) | ||||
# self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | ||||
MinRK
|
r3546 | sub = {'dependencies_met' : True} | ||
MinRK
|
r3539 | try: | ||
# allow for not overriding displayhook | ||||
if hasattr(sys.displayhook, 'set_parent'): | ||||
sys.displayhook.set_parent(parent) | ||||
# exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | ||||
if bound: | ||||
working = self.user_ns | ||||
suffix = str(msg_id).replace("-","") | ||||
prefix = "_" | ||||
else: | ||||
working = dict() | ||||
MinRK
|
r3540 | suffix = prefix = "_" # prevent keyword collisions with lambda | ||
MinRK
|
r3539 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | ||
# if f.fun | ||||
fname = prefix+f.func_name.strip('<>')+suffix | ||||
argname = prefix+"args"+suffix | ||||
kwargname = prefix+"kwargs"+suffix | ||||
resultname = prefix+"result"+suffix | ||||
ns = { fname : f, argname : args, kwargname : kwargs } | ||||
# print ns | ||||
working.update(ns) | ||||
code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | ||||
exec code in working, working | ||||
result = working.get(resultname) | ||||
# clear the namespace | ||||
if bound: | ||||
for key in ns.iterkeys(): | ||||
self.user_ns.pop(key) | ||||
else: | ||||
del working | ||||
packed_result,buf = serialize_object(result) | ||||
result_buf = [packed_result]+buf | ||||
except: | ||||
result = u'error' | ||||
etype, evalue, tb = sys.exc_info() | ||||
tb = traceback.format_exception(etype, evalue, tb) | ||||
exc_content = { | ||||
u'status' : u'error', | ||||
u'traceback' : tb, | ||||
u'etype' : unicode(etype), | ||||
u'evalue' : unicode(evalue) | ||||
} | ||||
# exc_msg = self.session.msg(u'pyerr', exc_content, parent) | ||||
self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | ||||
reply_content = exc_content | ||||
result_buf = [] | ||||
MinRK
|
r3546 | |||
if etype is UnmetDependency: | ||||
sub = {'dependencies_met' : False} | ||||
MinRK
|
r3539 | else: | ||
reply_content = {'status' : 'ok'} | ||||
# reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | ||||
# self.reply_socket.send(ident, zmq.SNDMORE) | ||||
# self.reply_socket.send_json(reply_msg) | ||||
MinRK
|
r3546 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, | ||
parent=parent, ident=ident,buffers=result_buf, subheader=sub) | ||||
MinRK
|
r3553 | print(Message(reply_msg), file=sys.__stdout__) | ||
MinRK
|
r3546 | # if reply_msg['content']['status'] == u'error': | ||
# self.abort_queues() | ||||
MinRK
|
r3539 | |||
def dispatch_queue(self, stream, msg): | ||||
MinRK
|
r3541 | self.control_stream.flush() | ||
MinRK
|
r3539 | idents,msg = self.session.feed_identities(msg, copy=False) | ||
msg = self.session.unpack_message(msg, content=True, copy=False) | ||||
header = msg['header'] | ||||
msg_id = header['msg_id'] | ||||
if self.check_aborted(msg_id): | ||||
MinRK
|
r3540 | self.aborted.remove(msg_id) | ||
# is it safe to assume a msg_id will not be resubmitted? | ||||
reply_type = msg['msg_type'].split('_')[0] + '_reply' | ||||
reply_msg = self.session.send(stream, reply_type, | ||||
content={'status' : 'aborted'}, parent=msg, ident=idents) | ||||
return | ||||
MinRK
|
r3539 | handler = self.queue_handlers.get(msg['msg_type'], None) | ||
if handler is None: | ||||
MinRK
|
r3553 | print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) | ||
MinRK
|
r3539 | else: | ||
handler(stream, idents, msg) | ||||
def start(self): | ||||
#### stream mode: | ||||
if self.control_stream: | ||||
self.control_stream.on_recv(self.dispatch_control, copy=False) | ||||
MinRK
|
r3540 | self.control_stream.on_err(printer) | ||
MinRK
|
r3539 | if self.reply_stream: | ||
self.reply_stream.on_recv(lambda msg: | ||||
self.dispatch_queue(self.reply_stream, msg), copy=False) | ||||
MinRK
|
r3540 | self.reply_stream.on_err(printer) | ||
MinRK
|
r3539 | if self.task_stream: | ||
self.task_stream.on_recv(lambda msg: | ||||
self.dispatch_queue(self.task_stream, msg), copy=False) | ||||
MinRK
|
r3540 | self.task_stream.on_err(printer) | ||
MinRK
|
r3539 | |||
#### while True mode: | ||||
# while True: | ||||
# idle = True | ||||
# try: | ||||
# msg = self.reply_stream.socket.recv_multipart( | ||||
# zmq.NOBLOCK, copy=False) | ||||
# except zmq.ZMQError, e: | ||||
# if e.errno != zmq.EAGAIN: | ||||
# raise e | ||||
# else: | ||||
# idle=False | ||||
# self.dispatch_queue(self.reply_stream, msg) | ||||
# | ||||
# if not self.task_stream.empty(): | ||||
# idle=False | ||||
# msg = self.task_stream.recv_multipart() | ||||
# self.dispatch_queue(self.task_stream, msg) | ||||
# if idle: | ||||
# # don't busywait | ||||
# time.sleep(1e-3) | ||||
def main(): | ||||
raise Exception("Don't run me anymore") | ||||
loop = ioloop.IOLoop.instance() | ||||
c = zmq.Context() | ||||
ip = '127.0.0.1' | ||||
port_base = 5575 | ||||
connection = ('tcp://%s' % ip) + ':%i' | ||||
rep_conn = connection % port_base | ||||
pub_conn = connection % (port_base+1) | ||||
MinRK
|
r3553 | print("Starting the kernel...", file=sys.__stdout__) | ||
MinRK
|
r3539 | # print >>sys.__stdout__, "XREQ Channel:", rep_conn | ||
# print >>sys.__stdout__, "PUB Channel:", pub_conn | ||||
session = StreamSession(username=u'kernel') | ||||
reply_socket = c.socket(zmq.XREQ) | ||||
reply_socket.connect(rep_conn) | ||||
pub_socket = c.socket(zmq.PUB) | ||||
pub_socket.connect(pub_conn) | ||||
stdout = OutStream(session, pub_socket, u'stdout') | ||||
stderr = OutStream(session, pub_socket, u'stderr') | ||||
sys.stdout = stdout | ||||
sys.stderr = stderr | ||||
display_hook = DisplayHook(session, pub_socket) | ||||
sys.displayhook = display_hook | ||||
reply_stream = zmqstream.ZMQStream(reply_socket,loop) | ||||
pub_stream = zmqstream.ZMQStream(pub_socket,loop) | ||||
kernel = Kernel(session, reply_stream, pub_stream) | ||||
# For debugging convenience, put sleep and a string in the namespace, so we | ||||
# have them every time we start. | ||||
kernel.user_ns['sleep'] = time.sleep | ||||
kernel.user_ns['s'] = 'Test string' | ||||
MinRK
|
r3553 | print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__) | ||
MinRK
|
r3539 | kernel.start() | ||
loop.start() | ||||
if __name__ == '__main__': | ||||
main() | ||||