diff --git a/IPython/zmq/parallel/engine.py b/IPython/zmq/parallel/engine.py index 92af2a5..4eead85 100644 --- a/IPython/zmq/parallel/engine.py +++ b/IPython/zmq/parallel/engine.py @@ -13,9 +13,12 @@ from pprint import pprint import zmq from zmq.eventloop import ioloop, zmqstream +from IPython.utils.traitlets import HasTraits +from IPython.utils.localinterfaces import LOCALHOST + from streamsession import Message, StreamSession from client import Client -import streamkernel as kernel +from streamkernel import Kernel, make_kernel import heartmonitor from entry_point import make_base_argument_parser, connect_logger, parse_url # import taskthread @@ -59,45 +62,25 @@ class Engine(object): if msg.content.status == 'ok': self.session.username = str(msg.content.id) queue_addr = msg.content.queue - if queue_addr: - queue = self.context.socket(zmq.PAIR) - queue.setsockopt(zmq.IDENTITY, self.ident) - queue.connect(str(queue_addr)) - self.queue = zmqstream.ZMQStream(queue, self.loop) - - control_addr = msg.content.control - if control_addr: - control = self.context.socket(zmq.PAIR) - control.setsockopt(zmq.IDENTITY, self.ident) - control.connect(str(control_addr)) - self.control = zmqstream.ZMQStream(control, self.loop) - + shell_addrs = [str(queue_addr)] + control_addr = str(msg.content.control) task_addr = msg.content.task - print (task_addr) if task_addr: - # task as stream: - task = self.context.socket(zmq.PAIR) - task.setsockopt(zmq.IDENTITY, self.ident) - task.connect(str(task_addr)) - self.task_stream = zmqstream.ZMQStream(task, self.loop) - # TaskThread: - # mon_addr = msg.content.monitor - # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident) - # task.connect_in(str(task_addr)) - # task.connect_out(str(mon_addr)) - # self.task_stream = taskthread.QueueStream(*task.queues) - # task.start() + shell_addrs.append(str(task_addr)) - hbs = msg.content.heartbeat - self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident) - self.heart.start() + hb_addrs = msg.content.heartbeat # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() - # placeholder for now: - pub = self.context.socket(zmq.PUB) - pub = zmqstream.ZMQStream(pub, self.loop) - # create and start the kernel - self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) - self.kernel.start() + + # placeholder for no, since pub isn't hooked up: + sub = self.context.socket(zmq.SUB) + sub = zmqstream.ZMQStream(sub, self.loop) + sub.on_recv(lambda *a: None) + port = sub.bind_to_random_port("tcp://%s"%LOCALHOST) + iopub_addr = "tcp://%s:%i"%(LOCALHOST,12345) + + make_kernel(self.ident, control_addr, shell_addrs, iopub_addr, hb_addrs, + client_addr=None, loop=self.loop, context=self.context) + else: # logger.error("Registration Failed: %s"%msg) raise Exception("Registration Failed: %s"%msg) @@ -114,6 +97,7 @@ class Engine(object): def start(self): print ("registering") self.register() + def main(): diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py index d6834ea..f3139f1 100755 --- a/IPython/zmq/parallel/streamkernel.py +++ b/IPython/zmq/parallel/streamkernel.py @@ -3,8 +3,14 @@ Kernel adapted from kernel.py to use ZMQ Streams """ +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +# Standard library imports. from __future__ import print_function import __builtin__ +from code import CommandCompiler import os import sys import time @@ -12,141 +18,43 @@ import traceback from signal import SIGTERM, SIGKILL from pprint import pprint -from code import CommandCompiler - +# System library imports. import zmq from zmq.eventloop import ioloop, zmqstream +# Local imports. +from IPython.utils.traitlets import HasTraits, Instance, List from IPython.zmq.completer import KernelCompleter from streamsession import StreamSession, Message, extract_header, serialize_object,\ unpack_apply_message from dependency import UnmetDependency +import heartmonitor +from client import Client def printer(*args): pprint(args) -class OutStream(object): - """A file like object that publishes the stream to a 0MQ PUB socket.""" - - def __init__(self, session, pub_socket, name, max_buffer=200): - self.session = session - self.pub_socket = pub_socket - self.name = name - self._buffer = [] - self._buffer_len = 0 - self.max_buffer = max_buffer - self.parent_header = {} - - def set_parent(self, parent): - self.parent_header = extract_header(parent) - - def close(self): - self.pub_socket = None - - def flush(self): - if self.pub_socket is None: - raise ValueError(u'I/O operation on closed file') - else: - if self._buffer: - data = ''.join(self._buffer) - content = {u'name':self.name, u'data':data} - # msg = self.session.msg(u'stream', content=content, - # parent=self.parent_header) - msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) - # print>>sys.__stdout__, Message(msg) - # self.pub_socket.send_json(msg) - self._buffer_len = 0 - self._buffer = [] - - def isattr(self): - return False - - def next(self): - raise IOError('Read not supported on a write only stream.') - - def read(self, size=None): - raise IOError('Read not supported on a write only stream.') - - readline=read - - def write(self, s): - if self.pub_socket is None: - raise ValueError('I/O operation on closed file') - else: - self._buffer.append(s) - self._buffer_len += len(s) - self._maybe_send() - - def _maybe_send(self): - if '\n' in self._buffer[-1]: - self.flush() - if self._buffer_len > self.max_buffer: - self.flush() - - def writelines(self, sequence): - if self.pub_socket is None: - raise ValueError('I/O operation on closed file') - else: - for s in sequence: - self.write(s) - - -class DisplayHook(object): - - def __init__(self, session, pub_socket): - self.session = session - self.pub_socket = pub_socket - self.parent_header = {} - - def __call__(self, obj): - if obj is None: - return - - __builtin__._ = obj - # msg = self.session.msg(u'pyout', {u'data':repr(obj)}, - # parent=self.parent_header) - # self.pub_socket.send_json(msg) - self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header) - - def set_parent(self, parent): - self.parent_header = extract_header(parent) - - -class RawInput(object): - - def __init__(self, session, socket): - self.session = session - self.socket = socket - - def __call__(self, prompt=None): - msg = self.session.msg(u'raw_input') - self.socket.send_json(msg) - while True: - try: - reply = self.socket.recv_json(zmq.NOBLOCK) - except zmq.ZMQError as e: - if e.errno == zmq.EAGAIN: - pass - else: - raise - else: - break - return reply[u'content'][u'data'] +#----------------------------------------------------------------------------- +# Main kernel class +#----------------------------------------------------------------------------- +class Kernel(HasTraits): -class Kernel(object): + #--------------------------------------------------------------------------- + # Kernel interface + #--------------------------------------------------------------------------- - def __init__(self, session, control_stream, reply_stream, pub_stream, - task_stream=None, client=None): - self.session = session - self.control_stream = control_stream - # self.control_socket = control_stream.socket - self.reply_stream = reply_stream - self.identity = self.reply_stream.getsockopt(zmq.IDENTITY) - self.task_stream = task_stream - self.pub_stream = pub_stream - self.client = client + session = Instance(StreamSession) + shell_streams = Instance(list) + control_stream = Instance(zmqstream.ZMQStream) + task_stream = Instance(zmqstream.ZMQStream) + iopub_stream = Instance(zmqstream.ZMQStream) + client = Instance(Client) + + def __init__(self, **kwargs): + super(Kernel, self).__init__(**kwargs) + self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY) self.user_ns = {} self.history = [] self.compiler = CommandCompiler() @@ -154,17 +62,18 @@ class Kernel(object): self.aborted = set() # Build dict of handlers for message types - self.queue_handlers = {} + self.shell_handlers = {} self.control_handlers = {} - for msg_type in ['execute_request', 'complete_request', 'apply_request']: - self.queue_handlers[msg_type] = getattr(self, msg_type) + for msg_type in ['execute_request', 'complete_request', 'apply_request', + 'clear_request']: + self.shell_handlers[msg_type] = getattr(self, msg_type) - for msg_type in ['kill_request', 'abort_request', 'clear_request']+self.queue_handlers.keys(): + for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys(): self.control_handlers[msg_type] = getattr(self, msg_type) #-------------------- control handlers ----------------------------- def abort_queues(self): - for stream in (self.task_stream, self.reply_stream): + for stream in self.shell_streams: if stream: self.abort_queue(stream) @@ -214,23 +123,17 @@ class Kernel(object): parent=parent, ident=ident)[0] print(Message(reply_msg), file=sys.__stdout__) - def kill_request(self, stream, idents, parent): + def shutdown_request(self, stream, ident, parent): """kill ourself. This should really be handled in an external process""" self.abort_queues() - msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, - content = dict(status='ok')) - # we can know that a message is done if we *don't* use streams, but - # use a socket directly with MessageTracker - time.sleep(.5) - os.kill(os.getpid(), SIGTERM) - time.sleep(1) - os.kill(os.getpid(), SIGKILL) - - def clear_request(self, stream, idents, parent): - """Clear our namespace.""" - self.user_ns = {} - msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, - content = dict(status='ok')) + content = dict(parent['content']) + msg = self.session.send(self.reply_socket, 'shutdown_reply', + content, parent, ident) + msg = self.session.send(self.pub_socket, 'shutdown_reply', + content, parent, ident) + # print >> sys.__stdout__, msg + time.sleep(0.1) + sys.exit(0) def dispatch_control(self, msg): idents,msg = self.session.feed_identities(msg, copy=False) @@ -274,6 +177,12 @@ class Kernel(object): #-------------------- queue handlers ----------------------------- + def clear_request(self, stream, idents, parent): + """Clear our namespace.""" + self.user_ns = {} + msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent, + content = dict(status='ok')) + def execute_request(self, stream, ident, parent): try: code = parent[u'content'][u'code'] @@ -282,8 +191,8 @@ class Kernel(object): print(Message(parent), file=sys.__stderr__) return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) - # self.pub_stream.send(pyin_msg) - self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) + # self.iopub_stream.send(pyin_msg) + self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) try: comp_code = self.compiler(code, '') # allow for not overriding displayhook @@ -301,7 +210,7 @@ class Kernel(object): u'evalue' : unicode(evalue) } # exc_msg = self.session.msg(u'pyerr', exc_content, parent) - self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) + self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) reply_content = exc_content else: reply_content = {'status' : 'ok'} @@ -335,8 +244,8 @@ class Kernel(object): print(Message(parent), file=sys.__stderr__) return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) - # self.pub_stream.send(pyin_msg) - # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) + # self.iopub_stream.send(pyin_msg) + # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) sub = {'dependencies_met' : True, 'engine' : self.identity} try: # allow for not overriding displayhook @@ -384,7 +293,7 @@ class Kernel(object): u'evalue' : unicode(evalue) } # exc_msg = self.session.msg(u'pyerr', exc_content, parent) - self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) + self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent) reply_content = exc_content result_buf = [] @@ -415,7 +324,7 @@ class Kernel(object): reply_msg = self.session.send(stream, reply_type, content={'status' : 'aborted'}, parent=msg, ident=idents) return - handler = self.queue_handlers.get(msg['msg_type'], None) + handler = self.shell_handlers.get(msg['msg_type'], None) if handler is None: print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__) else: @@ -426,27 +335,28 @@ class Kernel(object): if self.control_stream: self.control_stream.on_recv(self.dispatch_control, copy=False) self.control_stream.on_err(printer) - if self.reply_stream: - self.reply_stream.on_recv(lambda msg: - self.dispatch_queue(self.reply_stream, msg), copy=False) - self.reply_stream.on_err(printer) - if self.task_stream: - self.task_stream.on_recv(lambda msg: - self.dispatch_queue(self.task_stream, msg), copy=False) - self.task_stream.on_err(printer) + + for s in self.shell_streams: + s.on_recv(lambda msg: + self.dispatch_queue(s, msg), copy=False) + s.on_err(printer) + + if self.iopub_stream: + self.iopub_stream.on_err(printer) + self.iopub_stream.on_send(printer) #### while True mode: # while True: # idle = True # try: - # msg = self.reply_stream.socket.recv_multipart( + # msg = self.shell_stream.socket.recv_multipart( # zmq.NOBLOCK, copy=False) # except zmq.ZMQError, e: # if e.errno != zmq.EAGAIN: # raise e # else: # idle=False - # self.dispatch_queue(self.reply_stream, msg) + # self.dispatch_queue(self.shell_stream, msg) # # if not self.task_stream.empty(): # idle=False @@ -456,50 +366,48 @@ class Kernel(object): # # don't busywait # time.sleep(1e-3) - -def main(): - raise Exception("Don't run me anymore") - loop = ioloop.IOLoop.instance() - c = zmq.Context() - - ip = '127.0.0.1' - port_base = 5575 - connection = ('tcp://%s' % ip) + ':%i' - rep_conn = connection % port_base - pub_conn = connection % (port_base+1) - - print("Starting the kernel...", file=sys.__stdout__) - # print >>sys.__stdout__, "XREQ Channel:", rep_conn - # print >>sys.__stdout__, "PUB Channel:", pub_conn - - session = StreamSession(username=u'kernel') - - reply_socket = c.socket(zmq.XREQ) - reply_socket.connect(rep_conn) - - pub_socket = c.socket(zmq.PUB) - pub_socket.connect(pub_conn) - - stdout = OutStream(session, pub_socket, u'stdout') - stderr = OutStream(session, pub_socket, u'stderr') - sys.stdout = stdout - sys.stderr = stderr - - display_hook = DisplayHook(session, pub_socket) - sys.displayhook = display_hook - reply_stream = zmqstream.ZMQStream(reply_socket,loop) - pub_stream = zmqstream.ZMQStream(pub_socket,loop) - kernel = Kernel(session, reply_stream, pub_stream) - - # For debugging convenience, put sleep and a string in the namespace, so we - # have them every time we start. - kernel.user_ns['sleep'] = time.sleep - kernel.user_ns['s'] = 'Test string' +def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs, + client_addr=None, loop=None, context=None): + # create loop, context, and session: + if loop is None: + loop = ioloop.IOLoop.instance() + if context is None: + context = zmq.Context() + c = context + session = StreamSession() + print (control_addr, shell_addrs, iopub_addr, hb_addrs) + + # create Control Stream + control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) + control_stream.setsockopt(zmq.IDENTITY, identity) + control_stream.connect(control_addr) + + # create Shell Streams (MUX, Task, etc.): + shell_streams = [] + for addr in shell_addrs: + stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop) + stream.setsockopt(zmq.IDENTITY, identity) + stream.connect(addr) + shell_streams.append(stream) - print ("Use Ctrl-\\ (NOT Ctrl-C!) to terminate.", file=sys.__stdout__) + # create iopub stream: + iopub_stream = zmqstream.ZMQStream(c.socket(zmq.PUB), loop) + iopub_stream.setsockopt(zmq.IDENTITY, identity) + iopub_stream.connect(iopub_addr) + + # launch heartbeat + heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity) + heart.start() + + # create (optional) Client + if client_addr: + client = Client(client_addr, username=identity) + else: + client = None + + kernel = Kernel(session=session, control_stream=control_stream, + shell_streams=shell_streams, iopub_stream=iopub_stream, + client=client) kernel.start() - loop.start() - + return loop, c -if __name__ == '__main__': - main()