diff --git a/IPython/frontend/html/notebook/notebookapp.py b/IPython/frontend/html/notebook/notebookapp.py
index bdef5ad..24f0188 100644
--- a/IPython/frontend/html/notebook/notebookapp.py
+++ b/IPython/frontend/html/notebook/notebookapp.py
@@ -32,8 +32,7 @@ import zmq
# Install the pyzmq ioloop. This has to be done before anything else from
# tornado is imported.
from zmq.eventloop import ioloop
-import tornado.ioloop
-tornado.ioloop.IOLoop = ioloop.IOLoop
+ioloop.install()
from tornado import httpserver
from tornado import web
diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py
index 35ca919..e5670bf 100644
--- a/IPython/zmq/kernelmanager.py
+++ b/IPython/zmq/kernelmanager.py
@@ -18,7 +18,6 @@ TODO
# Standard library imports.
import errno
import json
-from Queue import Queue, Empty
from subprocess import Popen
import os
import signal
@@ -28,8 +27,7 @@ import time
# System library imports.
import zmq
-from zmq import POLLIN, POLLOUT, POLLERR
-from zmq.eventloop import ioloop
+from zmq.eventloop import ioloop, zmqstream
# Local imports.
from IPython.config.loader import Config
@@ -88,7 +86,7 @@ class ZMQSocketChannel(Thread):
session = None
socket = None
ioloop = None
- iostate = None
+ stream = None
_address = None
def __init__(self, context, session, address):
@@ -144,37 +142,28 @@ class ZMQSocketChannel(Thread):
"""
return self._address
- def add_io_state(self, state):
- """Add IO state to the eventloop.
-
+ def _queue_send(self, msg):
+ """Queue a message to be sent from the IOLoop's thread.
+
Parameters
----------
- state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
- The IO state flag to set.
-
- This is thread safe as it uses the thread safe IOLoop.add_callback.
+ msg : message to send
+
+ This is threadsafe, as it uses IOLoop.add_callback to give the loop's
+ thread control of the action.
"""
- def add_io_state_callback():
- if not self.iostate & state:
- self.iostate = self.iostate | state
- self.ioloop.update_handler(self.socket, self.iostate)
- self.ioloop.add_callback(add_io_state_callback)
-
- def drop_io_state(self, state):
- """Drop IO state from the eventloop.
-
- Parameters
- ----------
- state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR
- The IO state flag to set.
+ def thread_send():
+ self.session.send(self.stream, msg)
+ self.ioloop.add_callback(thread_send)
- This is thread safe as it uses the thread safe IOLoop.add_callback.
+ def _handle_recv(self, msg):
+ """callback for stream.on_recv
+
+ unpacks message, and calls handlers with it.
"""
- def drop_io_state_callback():
- if self.iostate & state:
- self.iostate = self.iostate & (~state)
- self.ioloop.update_handler(self.socket, self.iostate)
- self.ioloop.add_callback(drop_io_state_callback)
+ ident,smsg = self.session.feed_identities(msg)
+ self.call_handlers(self.session.unserialize(smsg))
+
class ShellSocketChannel(ZMQSocketChannel):
@@ -187,7 +176,6 @@ class ShellSocketChannel(ZMQSocketChannel):
def __init__(self, context, session, address):
super(ShellSocketChannel, self).__init__(context, session, address)
- self.command_queue = Queue()
self.ioloop = ioloop.IOLoop()
def run(self):
@@ -195,9 +183,8 @@ class ShellSocketChannel(ZMQSocketChannel):
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect('tcp://%s:%i' % self.address)
- self.iostate = POLLERR|POLLIN
- self.ioloop.add_handler(self.socket, self._handle_events,
- self.iostate)
+ self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
+ self.stream.on_recv(self._handle_recv)
self._run_loop()
def stop(self):
@@ -268,7 +255,7 @@ class ShellSocketChannel(ZMQSocketChannel):
allow_stdin=allow_stdin,
)
msg = self.session.msg('execute_request', content)
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
def complete(self, text, line, cursor_pos, block=None):
@@ -293,7 +280,7 @@ class ShellSocketChannel(ZMQSocketChannel):
"""
content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos)
msg = self.session.msg('complete_request', content)
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
def object_info(self, oname):
@@ -310,7 +297,7 @@ class ShellSocketChannel(ZMQSocketChannel):
"""
content = dict(oname=oname)
msg = self.session.msg('object_info_request', content)
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
def history(self, raw=True, output=False, hist_access_type='range', **kwargs):
@@ -348,7 +335,7 @@ class ShellSocketChannel(ZMQSocketChannel):
content = dict(raw=raw, output=output, hist_access_type=hist_access_type,
**kwargs)
msg = self.session.msg('history_request', content)
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
def shutdown(self, restart=False):
@@ -365,38 +352,9 @@ class ShellSocketChannel(ZMQSocketChannel):
# Send quit message to kernel. Once we implement kernel-side setattr,
# this should probably be done that way, but for now this will do.
msg = self.session.msg('shutdown_request', {'restart':restart})
- self._queue_request(msg)
+ self._queue_send(msg)
return msg['header']['msg_id']
- def _handle_events(self, socket, events):
- if events & POLLERR:
- self._handle_err()
- if events & POLLOUT:
- self._handle_send()
- if events & POLLIN:
- self._handle_recv()
-
- def _handle_recv(self):
- ident,msg = self.session.recv(self.socket, 0)
- self.call_handlers(msg)
-
- def _handle_send(self):
- try:
- msg = self.command_queue.get(False)
- except Empty:
- pass
- else:
- self.session.send(self.socket,msg)
- if self.command_queue.empty():
- self.drop_io_state(POLLOUT)
-
- def _handle_err(self):
- # We don't want to let this go silently, so eventually we should log.
- raise zmq.ZMQError()
-
- def _queue_request(self, msg):
- self.command_queue.put(msg)
- self.add_io_state(POLLOUT)
class SubSocketChannel(ZMQSocketChannel):
@@ -413,9 +371,8 @@ class SubSocketChannel(ZMQSocketChannel):
self.socket.setsockopt(zmq.SUBSCRIBE,b'')
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect('tcp://%s:%i' % self.address)
- self.iostate = POLLIN|POLLERR
- self.ioloop.add_handler(self.socket, self._handle_events,
- self.iostate)
+ self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
+ self.stream.on_recv(self._handle_recv)
self._run_loop()
def stop(self):
@@ -456,33 +413,9 @@ class SubSocketChannel(ZMQSocketChannel):
while not self._flushed and time.time() < stop_time:
time.sleep(0.01)
- def _handle_events(self, socket, events):
- # Turn on and off POLLOUT depending on if we have made a request
- if events & POLLERR:
- self._handle_err()
- if events & POLLIN:
- self._handle_recv()
-
- def _handle_err(self):
- # We don't want to let this go silently, so eventually we should log.
- raise zmq.ZMQError()
-
- def _handle_recv(self):
- # Get all of the messages we can
- while True:
- try:
- ident,msg = self.session.recv(self.socket)
- except zmq.ZMQError:
- # Check the errno?
- # Will this trigger POLLERR?
- break
- else:
- if msg is None:
- break
- self.call_handlers(msg)
-
def _flush(self):
"""Callback for :method:`self.flush`."""
+ self.stream.flush()
self._flushed = True
@@ -494,16 +427,14 @@ class StdInSocketChannel(ZMQSocketChannel):
def __init__(self, context, session, address):
super(StdInSocketChannel, self).__init__(context, session, address)
self.ioloop = ioloop.IOLoop()
- self.msg_queue = Queue()
def run(self):
"""The thread's main activity. Call start() instead."""
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect('tcp://%s:%i' % self.address)
- self.iostate = POLLERR|POLLIN
- self.ioloop.add_handler(self.socket, self._handle_events,
- self.iostate)
+ self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
+ self.stream.on_recv(self._handle_recv)
self._run_loop()
def stop(self):
@@ -524,37 +455,7 @@ class StdInSocketChannel(ZMQSocketChannel):
"""Send a string of raw input to the kernel."""
content = dict(value=string)
msg = self.session.msg('input_reply', content)
- self._queue_reply(msg)
-
- def _handle_events(self, socket, events):
- if events & POLLERR:
- self._handle_err()
- if events & POLLOUT:
- self._handle_send()
- if events & POLLIN:
- self._handle_recv()
-
- def _handle_recv(self):
- ident,msg = self.session.recv(self.socket, 0)
- self.call_handlers(msg)
-
- def _handle_send(self):
- try:
- msg = self.msg_queue.get(False)
- except Empty:
- pass
- else:
- self.session.send(self.socket,msg)
- if self.msg_queue.empty():
- self.drop_io_state(POLLOUT)
-
- def _handle_err(self):
- # We don't want to let this go silently, so eventually we should log.
- raise zmq.ZMQError()
-
- def _queue_reply(self, msg):
- self.msg_queue.put(msg)
- self.add_io_state(POLLOUT)
+ self._queue_send(msg)
class HBSocketChannel(ZMQSocketChannel):