From 6b2ea5aa73bb36ba8e83e8998bb5853a3895f17f 2010-07-02 16:11:35 From: Fernando Perez Date: 2010-07-02 16:11:35 Subject: [PATCH] Added files from our zmq prototype into main ipython tree. Currently added unmodified. --- diff --git a/IPython/zmq/__init__.py b/IPython/zmq/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/IPython/zmq/__init__.py diff --git a/IPython/zmq/completer.py b/IPython/zmq/completer.py new file mode 100644 index 0000000..cf55899 --- /dev/null +++ b/IPython/zmq/completer.py @@ -0,0 +1,86 @@ +"""Tab-completion over zmq""" + +# Trying to get print statements to work during completion, not very +# successfully... +from __future__ import print_function + +import itertools +import readline +import rlcompleter +import time + +import session + +class KernelCompleter(object): + """Kernel-side completion machinery.""" + def __init__(self, namespace): + self.namespace = namespace + self.completer = rlcompleter.Completer(namespace) + + def complete(self, line, text): + # We'll likely use linel later even if now it's not used for anything + matches = [] + complete = self.completer.complete + for state in itertools.count(): + comp = complete(text, state) + if comp is None: + break + matches.append(comp) + return matches + + +class ClientCompleter(object): + """Client-side completion machinery. + + How it works: self.complete will be called multiple times, with + state=0,1,2,... When state=0 it should compute ALL the completion matches, + and then return them for each value of state.""" + + def __init__(self, client, session, socket): + # ugly, but we get called asynchronously and need access to some + # client state, like backgrounded code + self.client = client + self.session = session + self.socket = socket + self.matches = [] + + def request_completion(self, text): + # Get full line to give to the kernel in case it wants more info. + line = readline.get_line_buffer() + # send completion request to kernel + msg = self.session.send(self.socket, + 'complete_request', + dict(text=text, line=line)) + + # Give the kernel up to 0.5s to respond + for i in range(5): + rep = self.session.recv(self.socket) + if rep is not None and rep.msg_type == 'complete_reply': + matches = rep.content.matches + break + time.sleep(0.1) + else: + # timeout + print ('TIMEOUT') # Can't see this message... + matches = None + return matches + + def complete(self, text, state): + + if self.client.backgrounded > 0: + print("\n[Not completing, background tasks active]") + print(readline.get_line_buffer(), end='') + return None + + if state==0: + matches = self.request_completion(text) + if matches is None: + self.matches = [] + print('WARNING: Kernel timeout on tab completion.') + else: + self.matches = matches + + try: + return self.matches[state] + except IndexError: + return None diff --git a/IPython/zmq/frontend.py b/IPython/zmq/frontend.py new file mode 100755 index 0000000..4dc73c9 --- /dev/null +++ b/IPython/zmq/frontend.py @@ -0,0 +1,194 @@ +#!/usr/bin/env python +"""A simple interactive frontend that talks to a kernel over 0MQ. +""" + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- +# stdlib +import cPickle as pickle +import code +import readline +import sys +import time +import uuid + +# our own +import zmq +import session +import completer + +#----------------------------------------------------------------------------- +# Classes and functions +#----------------------------------------------------------------------------- + +class Console(code.InteractiveConsole): + + def __init__(self, locals=None, filename="", + session = session, + request_socket=None, + sub_socket=None): + code.InteractiveConsole.__init__(self, locals, filename) + self.session = session + self.request_socket = request_socket + self.sub_socket = sub_socket + self.backgrounded = 0 + self.messages = {} + + # Set tab completion + self.completer = completer.ClientCompleter(self, session, request_socket) + readline.parse_and_bind('tab: complete') + readline.parse_and_bind('set show-all-if-ambiguous on') + readline.set_completer(self.completer.complete) + + # Set system prompts + sys.ps1 = 'Py>>> ' + sys.ps2 = ' ... ' + sys.ps3 = 'Out : ' + # Build dict of handlers for message types + self.handlers = {} + for msg_type in ['pyin', 'pyout', 'pyerr', 'stream']: + self.handlers[msg_type] = getattr(self, 'handle_%s' % msg_type) + + def handle_pyin(self, omsg): + if omsg.parent_header.session == self.session.session: + return + c = omsg.content.code.rstrip() + if c: + print '[IN from %s]' % omsg.parent_header.username + print c + + def handle_pyout(self, omsg): + #print omsg # dbg + if omsg.parent_header.session == self.session.session: + print "%s%s" % (sys.ps3, omsg.content.data) + else: + print '[Out from %s]' % omsg.parent_header.username + print omsg.content.data + + def print_pyerr(self, err): + print >> sys.stderr, err.etype,':', err.evalue + print >> sys.stderr, ''.join(err.traceback) + + def handle_pyerr(self, omsg): + if omsg.parent_header.session == self.session.session: + return + print >> sys.stderr, '[ERR from %s]' % omsg.parent_header.username + self.print_pyerr(omsg.content) + + def handle_stream(self, omsg): + if omsg.content.name == 'stdout': + outstream = sys.stdout + else: + outstream = sys.stderr + print >> outstream, '*ERR*', + print >> outstream, omsg.content.data, + + def handle_output(self, omsg): + handler = self.handlers.get(omsg.msg_type, None) + if handler is not None: + handler(omsg) + + def recv_output(self): + while True: + omsg = self.session.recv(self.sub_socket) + if omsg is None: + break + self.handle_output(omsg) + + def handle_reply(self, rep): + # Handle any side effects on output channels + self.recv_output() + # Now, dispatch on the possible reply types we must handle + if rep is None: + return + if rep.content.status == 'error': + self.print_pyerr(rep.content) + elif rep.content.status == 'aborted': + print >> sys.stderr, "ERROR: ABORTED" + ab = self.messages[rep.parent_header.msg_id].content + if 'code' in ab: + print >> sys.stderr, ab.code + else: + print >> sys.stderr, ab + + def recv_reply(self): + rep = self.session.recv(self.request_socket) + self.handle_reply(rep) + return rep + + def runcode(self, code): + # We can't pickle code objects, so fetch the actual source + src = '\n'.join(self.buffer) + + # for non-background inputs, if we do have previoiusly backgrounded + # jobs, check to see if they've produced results + if not src.endswith(';'): + while self.backgrounded > 0: + #print 'checking background' + rep = self.recv_reply() + if rep: + self.backgrounded -= 1 + time.sleep(0.05) + + # Send code execution message to kernel + omsg = self.session.send(self.request_socket, + 'execute_request', dict(code=src)) + self.messages[omsg.header.msg_id] = omsg + + # Fake asynchronicity by letting the user put ';' at the end of the line + if src.endswith(';'): + self.backgrounded += 1 + return + + # For foreground jobs, wait for reply + while True: + rep = self.recv_reply() + if rep is not None: + break + self.recv_output() + time.sleep(0.05) + else: + # We exited without hearing back from the kernel! + print >> sys.stderr, 'ERROR!!! kernel never got back to us!!!' + + +class InteractiveClient(object): + def __init__(self, session, request_socket, sub_socket): + self.session = session + self.request_socket = request_socket + self.sub_socket = sub_socket + self.console = Console(None, '', + session, request_socket, sub_socket) + + def interact(self): + self.console.interact() + + +def main(): + # Defaults + #ip = '192.168.2.109' + ip = '127.0.0.1' + #ip = '99.146.222.252' + port_base = 5575 + connection = ('tcp://%s' % ip) + ':%i' + req_conn = connection % port_base + sub_conn = connection % (port_base+1) + + # Create initial sockets + c = zmq.Context() + request_socket = c.socket(zmq.XREQ) + request_socket.connect(req_conn) + + sub_socket = c.socket(zmq.SUB) + sub_socket.connect(sub_conn) + sub_socket.setsockopt(zmq.SUBSCRIBE, '') + + # Make session and user-facing client + sess = session.Session() + client = InteractiveClient(sess, request_socket, sub_socket) + client.interact() + + +if __name__ == '__main__': + main() diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py new file mode 100644 index 0000000..658b3ed --- /dev/null +++ b/IPython/zmq/session.py @@ -0,0 +1,119 @@ +import os +import uuid +import pprint + +import zmq + +class Message(object): + """A simple message object that maps dict keys to attributes. + + A Message can be created from a dict and a dict from a Message instance + simply by calling dict(msg_obj).""" + + def __init__(self, msg_dict): + dct = self.__dict__ + for k, v in msg_dict.iteritems(): + if isinstance(v, dict): + v = Message(v) + dct[k] = v + + # Having this iterator lets dict(msg_obj) work out of the box. + def __iter__(self): + return iter(self.__dict__.iteritems()) + + def __repr__(self): + return repr(self.__dict__) + + def __str__(self): + return pprint.pformat(self.__dict__) + + def __contains__(self, k): + return k in self.__dict__ + + def __getitem__(self, k): + return self.__dict__[k] + + +def msg_header(msg_id, username, session): + return { + 'msg_id' : msg_id, + 'username' : username, + 'session' : session + } + + +def extract_header(msg_or_header): + """Given a message or header, return the header.""" + if not msg_or_header: + return {} + try: + # See if msg_or_header is the entire message. + h = msg_or_header['header'] + except KeyError: + try: + # See if msg_or_header is just the header + h = msg_or_header['msg_id'] + except KeyError: + raise + else: + h = msg_or_header + if not isinstance(h, dict): + h = dict(h) + return h + + +class Session(object): + + def __init__(self, username=os.environ.get('USER','username')): + self.username = username + self.session = str(uuid.uuid4()) + self.msg_id = 0 + + def msg_header(self): + h = msg_header(self.msg_id, self.username, self.session) + self.msg_id += 1 + return h + + def msg(self, msg_type, content=None, parent=None): + msg = {} + msg['header'] = self.msg_header() + msg['parent_header'] = {} if parent is None else extract_header(parent) + msg['msg_type'] = msg_type + msg['content'] = {} if content is None else content + return msg + + def send(self, socket, msg_type, content=None, parent=None, ident=None): + msg = self.msg(msg_type, content, parent) + if ident is not None: + socket.send(ident, zmq.SNDMORE) + socket.send_json(msg) + omsg = Message(msg) + return omsg + + def recv(self, socket, mode=zmq.NOBLOCK): + try: + msg = socket.recv_json(mode) + except zmq.ZMQError, e: + if e.errno == zmq.EAGAIN: + # We can convert EAGAIN to None as we know in this case + # recv_json won't return None. + return None + else: + raise + return Message(msg) + +def test_msg2obj(): + am = dict(x=1) + ao = Message(am) + assert ao.x == am['x'] + + am['y'] = dict(z=1) + ao = Message(am) + assert ao.y.z == am['y']['z'] + + k1, k2 = 'y', 'z' + assert ao[k1][k2] == am[k1][k2] + + am2 = dict(ao) + assert am['x'] == am2['x'] + assert am['y']['z'] == am2['y']['z'] diff --git a/docs/source/development/index.txt b/docs/source/development/index.txt index d09b0de..1d5bbee 100644 --- a/docs/source/development/index.txt +++ b/docs/source/development/index.txt @@ -14,6 +14,7 @@ IPython developer's guide release.txt roadmap.txt reorg.txt + messaging.txt magic_blueprint.txt notification_blueprint.txt ipgraph.txt diff --git a/docs/source/development/messaging.rst b/docs/source/development/messaging.rst new file mode 100644 index 0000000..9cf6253 --- /dev/null +++ b/docs/source/development/messaging.rst @@ -0,0 +1,97 @@ +===================== +Message Specification +===================== + +Note: not all of these have yet been fully fleshed out, but the key ones are, +see kernel and frontend files for actual implementation details. + +General Message Format +===================== + +General message format:: + + { + header : { 'msg_id' : 10, # start with 0 + 'username' : 'name', + 'session' : uuid + }, + parent_header : dict, + msg_type : 'string_message_type', + content : blackbox_dict , # Must be a dict + } + +Side effect: (PUB/SUB) +====================== + +# msg_type = 'stream' +content = { + name : 'stdout', + data : 'blob', +} + +# msg_type = 'pyin' +content = { + code = 'x=1', +} + +# msg_type = 'pyout' +content = { + data = 'repr(obj)', + prompt_number = 10 +} + +# msg_type = 'pyerr' +content = { + traceback : 'full traceback', + exc_type : 'TypeError', + exc_value : 'msg' +} + +# msg_type = 'file' +content = { + path = 'cool.jpg', + data : 'blob' +} + +Request/Reply +============= + +Execute +------- + +Request: + +# msg_type = 'execute_request' +content = { + code : 'a = 10', +} + +Reply: + +# msg_type = 'execute_reply' +content = { + 'status' : 'ok' OR 'error' OR 'abort' + # data depends on status value +} + +Complete +-------- + +# msg_type = 'complete_request' +content = { + text : 'a.f', # complete on this + line : 'print a.f' # full line +} + +# msg_type = 'complete_reply' +content = { + matches : ['a.foo', 'a.bar'] +} + +Control +------- + +# msg_type = 'heartbeat' +content = { + +}