diff --git a/IPython/zmq/completer.py b/IPython/zmq/completer.py index cf55899..b775b8a 100644 --- a/IPython/zmq/completer.py +++ b/IPython/zmq/completer.py @@ -54,7 +54,8 @@ class ClientCompleter(object): # Give the kernel up to 0.5s to respond for i in range(5): - rep = self.session.recv(self.socket) + ident,rep = self.session.recv(self.socket) + rep = Message(rep) if rep is not None and rep.msg_type == 'complete_reply': matches = rep.content.matches break diff --git a/IPython/zmq/displayhook.py b/IPython/zmq/displayhook.py index f37f30a..eb36beb 100644 --- a/IPython/zmq/displayhook.py +++ b/IPython/zmq/displayhook.py @@ -14,9 +14,8 @@ class DisplayHook(object): return __builtin__._ = obj - msg = self.session.msg(u'pyout', {u'data':repr(obj)}, + msg = self.session.send(self.pub_socket, u'pyout', {u'data':repr(obj)}, parent=self.parent_header) - self.pub_socket.send_json(msg) def set_parent(self, parent): self.parent_header = extract_header(parent) \ No newline at end of file diff --git a/IPython/zmq/frontend.py b/IPython/zmq/frontend.py index 90f3530..39eaffa 100755 --- a/IPython/zmq/frontend.py +++ b/IPython/zmq/frontend.py @@ -92,10 +92,10 @@ class Console(code.InteractiveConsole): def recv_output(self): while True: - omsg = self.session.recv(self.sub_socket) - if omsg is None: + ident,msg = self.session.recv(self.sub_socket) + if msg is None: break - self.handle_output(omsg) + self.handle_output(Message(msg)) def handle_reply(self, rep): # Handle any side effects on output channels @@ -114,9 +114,10 @@ class Console(code.InteractiveConsole): print >> sys.stderr, ab def recv_reply(self): - rep = self.session.recv(self.request_socket) - self.handle_reply(rep) - return rep + ident,rep = self.session.recv(self.request_socket) + mrep = Message(rep) + self.handle_reply(mrep) + return mrep def runcode(self, code): # We can't pickle code objects, so fetch the actual source diff --git a/IPython/zmq/iostream.py b/IPython/zmq/iostream.py index 437933e..f1c962f 100644 --- a/IPython/zmq/iostream.py +++ b/IPython/zmq/iostream.py @@ -37,10 +37,9 @@ class OutStream(object): data = self._buffer.getvalue() if data: content = {u'name':self.name, u'data':data} - msg = self.session.msg(u'stream', content=content, + msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) io.raw_print(msg) - self.pub_socket.send_json(msg) self._buffer.close() self._new_buffer() diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 772c798..011608f 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -106,17 +106,14 @@ class Kernel(Configurable): def do_one_iteration(self): """Do one iteration of the kernel's evaluation loop. """ - try: - ident = self.reply_socket.recv(zmq.NOBLOCK) - except zmq.ZMQError, e: - if e.errno == zmq.EAGAIN: - return - else: - raise + ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) + if msg is None: + return + # This assert will raise in versions of zeromq 2.0.7 and lesser. # We now require 2.0.8 or above, so we can uncomment for safety. - assert self.reply_socket.rcvmore(), "Missing message part." - msg = self.reply_socket.recv_json() + # print(ident,msg, file=sys.__stdout__) + assert ident is not None, "Missing message part." # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each @@ -169,17 +166,15 @@ class Kernel(Configurable): def _publish_pyin(self, code, parent): """Publish the code request on the pyin stream.""" - pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) - self.pub_socket.send_json(pyin_msg) + pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent) def execute_request(self, ident, parent): - status_msg = self.session.msg( + status_msg = self.session.send(self.pub_socket, u'status', {u'execution_state':u'busy'}, parent=parent ) - self.pub_socket.send_json(status_msg) try: content = parent[u'content'] @@ -264,7 +259,7 @@ class Kernel(Configurable): shell.payload_manager.clear_payload() # Send the reply. - reply_msg = self.session.msg(u'execute_reply', reply_content, parent) + reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident) io.raw_print(reply_msg) # Flush output before sending the reply. @@ -276,17 +271,14 @@ class Kernel(Configurable): if self._execute_sleep: time.sleep(self._execute_sleep) - self.reply_socket.send(ident, zmq.SNDMORE) - self.reply_socket.send_json(reply_msg) if reply_msg['content']['status'] == u'error': self._abort_queue() - status_msg = self.session.msg( + status_msg = self.session.send(self.pub_socket, u'status', {u'execution_state':u'idle'}, parent=parent ) - self.pub_socket.send_json(status_msg) def complete_request(self, ident, parent): txt, matches = self._complete(parent) @@ -335,22 +327,18 @@ class Kernel(Configurable): def _abort_queue(self): while True: - try: - ident = self.reply_socket.recv(zmq.NOBLOCK) - except zmq.ZMQError, e: - if e.errno == zmq.EAGAIN: - break + ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) + if msg is None: + break else: - assert self.reply_socket.rcvmore(), \ + assert ident is not None, \ "Unexpected missing message part." - msg = self.reply_socket.recv_json() io.raw_print("Aborting:\n", Message(msg)) msg_type = msg['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' - reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) + reply_msg = self.session.send(self.reply_socket, reply_type, + {'status' : 'aborted'}, msg, ident=ident) io.raw_print(reply_msg) - self.reply_socket.send(ident,zmq.SNDMORE) - self.reply_socket.send_json(reply_msg) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. time.sleep(0.1) @@ -362,11 +350,10 @@ class Kernel(Configurable): # Send the input request. content = dict(prompt=prompt) - msg = self.session.msg(u'input_request', content, parent) - self.req_socket.send_json(msg) + msg = self.session.send(self.req_socket, u'input_request', content, parent) # Await a response. - reply = self.req_socket.recv_json() + ident, reply = self.session.recv(self.req_socket, 0) try: value = reply['content']['value'] except: @@ -423,8 +410,8 @@ class Kernel(Configurable): """ # io.rprint("Kernel at_shutdown") # dbg if self._shutdown_message is not None: - self.reply_socket.send_json(self._shutdown_message) - self.pub_socket.send_json(self._shutdown_message) + self.session.send(self.reply_socket, self._shutdown_message) + self.session.send(self.pub_socket, self._shutdown_message) io.raw_print(self._shutdown_message) # A very short sleep to give zmq time to flush its message buffers # before Python truly shuts down. diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 089e519..002d12e 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -33,7 +33,7 @@ from zmq.eventloop import ioloop from IPython.utils import io from IPython.utils.localinterfaces import LOCALHOST, LOCAL_IPS from IPython.utils.traitlets import HasTraits, Any, Instance, Type, TCPAddress -from session import Session +from session import Session, Message #----------------------------------------------------------------------------- # Constants and exceptions @@ -330,7 +330,7 @@ class XReqSocketChannel(ZmqSocketChannel): self._handle_recv() def _handle_recv(self): - msg = self.socket.recv_json() + ident,msg = self.session.recv(self.socket, 0) self.call_handlers(msg) def _handle_send(self): @@ -339,7 +339,7 @@ class XReqSocketChannel(ZmqSocketChannel): except Empty: pass else: - self.socket.send_json(msg) + self.session.send(self.socket,msg) if self.command_queue.empty(): self.drop_io_state(POLLOUT) @@ -424,12 +424,14 @@ class SubSocketChannel(ZmqSocketChannel): # Get all of the messages we can while True: try: - msg = self.socket.recv_json(zmq.NOBLOCK) + ident,msg = self.session.recv(self.socket) except zmq.ZMQError: # Check the errno? # Will this trigger POLLERR? break else: + if msg is None: + break self.call_handlers(msg) def _flush(self): @@ -486,7 +488,7 @@ class RepSocketChannel(ZmqSocketChannel): self._handle_recv() def _handle_recv(self): - msg = self.socket.recv_json() + ident,msg = self.session.recv(self.socket, 0) self.call_handlers(msg) def _handle_send(self): @@ -495,7 +497,7 @@ class RepSocketChannel(ZmqSocketChannel): except Empty: pass else: - self.socket.send_json(msg) + self.session.send(self.socket,msg) if self.msg_queue.empty(): self.drop_io_state(POLLOUT) @@ -546,7 +548,7 @@ class HBSocketChannel(ZmqSocketChannel): request_time = time.time() try: #io.rprint('Ping from HB channel') # dbg - self.socket.send_json('ping') + self.socket.send(b'ping') except zmq.ZMQError, e: #io.rprint('*** HB Error:', e) # dbg if e.errno == zmq.EFSM: @@ -558,7 +560,7 @@ class HBSocketChannel(ZmqSocketChannel): else: while True: try: - self.socket.recv_json(zmq.NOBLOCK) + self.socket.recv(zmq.NOBLOCK) except zmq.ZMQError, e: #io.rprint('*** HB Error 2:', e) # dbg if e.errno == zmq.EAGAIN: diff --git a/IPython/zmq/pykernel.py b/IPython/zmq/pykernel.py index 8a8298c..8657add 100755 --- a/IPython/zmq/pykernel.py +++ b/IPython/zmq/pykernel.py @@ -69,9 +69,8 @@ class Kernel(HasTraits): """ Start the kernel main loop. """ while True: - ident = self.reply_socket.recv() - assert self.reply_socket.rcvmore(), "Missing message part." - msg = self.reply_socket.recv_json() + ident,msg = self.session.recv(self.reply_socket,0) + assert ident is not None, "Missing message part." omsg = Message(msg) print>>sys.__stdout__ print>>sys.__stdout__, omsg @@ -105,8 +104,7 @@ class Kernel(HasTraits): print>>sys.__stderr__, "Got bad msg: " print>>sys.__stderr__, Message(parent) return - pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) - self.pub_socket.send_json(pyin_msg) + pyin_msg = self.session.send(self.pub_socket, u'pyin',{u'code':code}, parent=parent) try: comp_code = self.compiler(code, '') @@ -131,8 +129,7 @@ class Kernel(HasTraits): u'ename' : unicode(etype.__name__), u'evalue' : unicode(evalue) } - exc_msg = self.session.msg(u'pyerr', exc_content, parent) - self.pub_socket.send_json(exc_msg) + exc_msg = self.session.send(self.pub_socket, u'pyerr', exc_content, parent) reply_content = exc_content else: reply_content = { 'status' : 'ok', 'payload' : {} } @@ -142,10 +139,8 @@ class Kernel(HasTraits): sys.stdout.flush() # Send the reply. - reply_msg = self.session.msg(u'execute_reply', reply_content, parent) + reply_msg = self.session.send(self.reply_socket, u'execute_reply', reply_content, parent, ident=ident) print>>sys.__stdout__, Message(reply_msg) - self.reply_socket.send(ident, zmq.SNDMORE) - self.reply_socket.send_json(reply_msg) if reply_msg['content']['status'] == u'error': self._abort_queue() @@ -180,21 +175,18 @@ class Kernel(HasTraits): def _abort_queue(self): while True: try: - ident = self.reply_socket.recv(zmq.NOBLOCK) + ident,msg = self.session.recv(self.reply_socket, zmq.NOBLOCK) except zmq.ZMQError, e: if e.errno == zmq.EAGAIN: break else: - assert self.reply_socket.rcvmore(), "Missing message part." - msg = self.reply_socket.recv_json() + assert ident is not None, "Missing message part." print>>sys.__stdout__, "Aborting:" print>>sys.__stdout__, Message(msg) msg_type = msg['msg_type'] reply_type = msg_type.split('_')[0] + '_reply' - reply_msg = self.session.msg(reply_type, {'status':'aborted'}, msg) + reply_msg = self.session.send(self.reply_socket, reply_type, {'status':'aborted'}, msg, ident=ident) print>>sys.__stdout__, Message(reply_msg) - self.reply_socket.send(ident,zmq.SNDMORE) - self.reply_socket.send_json(reply_msg) # We need to wait a bit for requests to come in. This can probably # be set shorter for true asynchronous clients. time.sleep(0.1) @@ -206,11 +198,10 @@ class Kernel(HasTraits): # Send the input request. content = dict(prompt=prompt) - msg = self.session.msg(u'input_request', content, parent) - self.req_socket.send_json(msg) + msg = self.session.send(self.req_socket, u'input_request', content, parent) # Await a response. - reply = self.req_socket.recv_json() + ident,reply = self.session.recv(self.req_socket, 0) try: value = reply['content']['value'] except: diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index de1de3e..337f755 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -4,6 +4,8 @@ import pprint import zmq +from zmq.utils import jsonapi as json + class Message(object): """A simple message object that maps dict keys to attributes. @@ -86,24 +88,35 @@ class Session(object): return msg def send(self, socket, msg_type, content=None, parent=None, ident=None): - msg = self.msg(msg_type, content, parent) + if isinstance(msg_type, (Message, dict)): + msg = dict(msg_type) + else: + msg = self.msg(msg_type, content, parent) if ident is not None: socket.send(ident, zmq.SNDMORE) socket.send_json(msg) - omsg = Message(msg) - return omsg - + # omsg = Message(msg) + return msg + def recv(self, socket, mode=zmq.NOBLOCK): try: - msg = socket.recv_json(mode) + msg = socket.recv_multipart(mode) except zmq.ZMQError, e: if e.errno == zmq.EAGAIN: # We can convert EAGAIN to None as we know in this case # recv_json won't return None. - return None + return None,None else: raise - return Message(msg) + if len(msg) == 1: + ident=None + msg = msg[0] + elif len(msg) == 2: + ident, msg = msg + else: + raise ValueError("Got message with length > 2, which is invalid") + + return ident, json.loads(msg) def test_msg2obj(): am = dict(x=1) diff --git a/IPython/zmq/zmqshell.py b/IPython/zmq/zmqshell.py index 51d8501..1eda3d9 100644 --- a/IPython/zmq/zmqshell.py +++ b/IPython/zmq/zmqshell.py @@ -71,7 +71,7 @@ class ZMQDisplayHook(DisplayHook): def finish_displayhook(self): """Finish up all displayhook activities.""" - self.pub_socket.send_json(self.msg) + self.session.send(self.pub_socket, self.msg) self.msg = None @@ -126,10 +126,9 @@ class ZMQInteractiveShell(InteractiveShell): } dh = self.displayhook - exc_msg = dh.session.msg(u'pyerr', exc_content, dh.parent_header) # Send exception info over pub socket for other clients than the caller # to pick up - dh.pub_socket.send_json(exc_msg) + exc_msg = dh.session.send(dh.pub_socket, u'pyerr', exc_content, dh.parent_header) # FIXME - Hack: store exception info in shell object. Right now, the # caller is reading this info after the fact, we need to fix this logic