diff --git a/IPython/frontend/html/notebook/notebookapp.py b/IPython/frontend/html/notebook/notebookapp.py index bdef5ad..24f0188 100644 --- a/IPython/frontend/html/notebook/notebookapp.py +++ b/IPython/frontend/html/notebook/notebookapp.py @@ -32,8 +32,7 @@ import zmq # Install the pyzmq ioloop. This has to be done before anything else from # tornado is imported. from zmq.eventloop import ioloop -import tornado.ioloop -tornado.ioloop.IOLoop = ioloop.IOLoop +ioloop.install() from tornado import httpserver from tornado import web diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 35ca919..e5670bf 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -18,7 +18,6 @@ TODO # Standard library imports. import errno import json -from Queue import Queue, Empty from subprocess import Popen import os import signal @@ -28,8 +27,7 @@ import time # System library imports. import zmq -from zmq import POLLIN, POLLOUT, POLLERR -from zmq.eventloop import ioloop +from zmq.eventloop import ioloop, zmqstream # Local imports. from IPython.config.loader import Config @@ -88,7 +86,7 @@ class ZMQSocketChannel(Thread): session = None socket = None ioloop = None - iostate = None + stream = None _address = None def __init__(self, context, session, address): @@ -144,37 +142,28 @@ class ZMQSocketChannel(Thread): """ return self._address - def add_io_state(self, state): - """Add IO state to the eventloop. - + def _queue_send(self, msg): + """Queue a message to be sent from the IOLoop's thread. + Parameters ---------- - state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR - The IO state flag to set. - - This is thread safe as it uses the thread safe IOLoop.add_callback. + msg : message to send + + This is threadsafe, as it uses IOLoop.add_callback to give the loop's + thread control of the action. """ - def add_io_state_callback(): - if not self.iostate & state: - self.iostate = self.iostate | state - self.ioloop.update_handler(self.socket, self.iostate) - self.ioloop.add_callback(add_io_state_callback) - - def drop_io_state(self, state): - """Drop IO state from the eventloop. - - Parameters - ---------- - state : zmq.POLLIN|zmq.POLLOUT|zmq.POLLERR - The IO state flag to set. + def thread_send(): + self.session.send(self.stream, msg) + self.ioloop.add_callback(thread_send) - This is thread safe as it uses the thread safe IOLoop.add_callback. + def _handle_recv(self, msg): + """callback for stream.on_recv + + unpacks message, and calls handlers with it. """ - def drop_io_state_callback(): - if self.iostate & state: - self.iostate = self.iostate & (~state) - self.ioloop.update_handler(self.socket, self.iostate) - self.ioloop.add_callback(drop_io_state_callback) + ident,smsg = self.session.feed_identities(msg) + self.call_handlers(self.session.unserialize(smsg)) + class ShellSocketChannel(ZMQSocketChannel): @@ -187,7 +176,6 @@ class ShellSocketChannel(ZMQSocketChannel): def __init__(self, context, session, address): super(ShellSocketChannel, self).__init__(context, session, address) - self.command_queue = Queue() self.ioloop = ioloop.IOLoop() def run(self): @@ -195,9 +183,8 @@ class ShellSocketChannel(ZMQSocketChannel): self.socket = self.context.socket(zmq.DEALER) self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) self.socket.connect('tcp://%s:%i' % self.address) - self.iostate = POLLERR|POLLIN - self.ioloop.add_handler(self.socket, self._handle_events, - self.iostate) + self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) + self.stream.on_recv(self._handle_recv) self._run_loop() def stop(self): @@ -268,7 +255,7 @@ class ShellSocketChannel(ZMQSocketChannel): allow_stdin=allow_stdin, ) msg = self.session.msg('execute_request', content) - self._queue_request(msg) + self._queue_send(msg) return msg['header']['msg_id'] def complete(self, text, line, cursor_pos, block=None): @@ -293,7 +280,7 @@ class ShellSocketChannel(ZMQSocketChannel): """ content = dict(text=text, line=line, block=block, cursor_pos=cursor_pos) msg = self.session.msg('complete_request', content) - self._queue_request(msg) + self._queue_send(msg) return msg['header']['msg_id'] def object_info(self, oname): @@ -310,7 +297,7 @@ class ShellSocketChannel(ZMQSocketChannel): """ content = dict(oname=oname) msg = self.session.msg('object_info_request', content) - self._queue_request(msg) + self._queue_send(msg) return msg['header']['msg_id'] def history(self, raw=True, output=False, hist_access_type='range', **kwargs): @@ -348,7 +335,7 @@ class ShellSocketChannel(ZMQSocketChannel): content = dict(raw=raw, output=output, hist_access_type=hist_access_type, **kwargs) msg = self.session.msg('history_request', content) - self._queue_request(msg) + self._queue_send(msg) return msg['header']['msg_id'] def shutdown(self, restart=False): @@ -365,38 +352,9 @@ class ShellSocketChannel(ZMQSocketChannel): # Send quit message to kernel. Once we implement kernel-side setattr, # this should probably be done that way, but for now this will do. msg = self.session.msg('shutdown_request', {'restart':restart}) - self._queue_request(msg) + self._queue_send(msg) return msg['header']['msg_id'] - def _handle_events(self, socket, events): - if events & POLLERR: - self._handle_err() - if events & POLLOUT: - self._handle_send() - if events & POLLIN: - self._handle_recv() - - def _handle_recv(self): - ident,msg = self.session.recv(self.socket, 0) - self.call_handlers(msg) - - def _handle_send(self): - try: - msg = self.command_queue.get(False) - except Empty: - pass - else: - self.session.send(self.socket,msg) - if self.command_queue.empty(): - self.drop_io_state(POLLOUT) - - def _handle_err(self): - # We don't want to let this go silently, so eventually we should log. - raise zmq.ZMQError() - - def _queue_request(self, msg): - self.command_queue.put(msg) - self.add_io_state(POLLOUT) class SubSocketChannel(ZMQSocketChannel): @@ -413,9 +371,8 @@ class SubSocketChannel(ZMQSocketChannel): self.socket.setsockopt(zmq.SUBSCRIBE,b'') self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) self.socket.connect('tcp://%s:%i' % self.address) - self.iostate = POLLIN|POLLERR - self.ioloop.add_handler(self.socket, self._handle_events, - self.iostate) + self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) + self.stream.on_recv(self._handle_recv) self._run_loop() def stop(self): @@ -456,33 +413,9 @@ class SubSocketChannel(ZMQSocketChannel): while not self._flushed and time.time() < stop_time: time.sleep(0.01) - def _handle_events(self, socket, events): - # Turn on and off POLLOUT depending on if we have made a request - if events & POLLERR: - self._handle_err() - if events & POLLIN: - self._handle_recv() - - def _handle_err(self): - # We don't want to let this go silently, so eventually we should log. - raise zmq.ZMQError() - - def _handle_recv(self): - # Get all of the messages we can - while True: - try: - ident,msg = self.session.recv(self.socket) - except zmq.ZMQError: - # Check the errno? - # Will this trigger POLLERR? - break - else: - if msg is None: - break - self.call_handlers(msg) - def _flush(self): """Callback for :method:`self.flush`.""" + self.stream.flush() self._flushed = True @@ -494,16 +427,14 @@ class StdInSocketChannel(ZMQSocketChannel): def __init__(self, context, session, address): super(StdInSocketChannel, self).__init__(context, session, address) self.ioloop = ioloop.IOLoop() - self.msg_queue = Queue() def run(self): """The thread's main activity. Call start() instead.""" self.socket = self.context.socket(zmq.DEALER) self.socket.setsockopt(zmq.IDENTITY, self.session.bsession) self.socket.connect('tcp://%s:%i' % self.address) - self.iostate = POLLERR|POLLIN - self.ioloop.add_handler(self.socket, self._handle_events, - self.iostate) + self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) + self.stream.on_recv(self._handle_recv) self._run_loop() def stop(self): @@ -524,37 +455,7 @@ class StdInSocketChannel(ZMQSocketChannel): """Send a string of raw input to the kernel.""" content = dict(value=string) msg = self.session.msg('input_reply', content) - self._queue_reply(msg) - - def _handle_events(self, socket, events): - if events & POLLERR: - self._handle_err() - if events & POLLOUT: - self._handle_send() - if events & POLLIN: - self._handle_recv() - - def _handle_recv(self): - ident,msg = self.session.recv(self.socket, 0) - self.call_handlers(msg) - - def _handle_send(self): - try: - msg = self.msg_queue.get(False) - except Empty: - pass - else: - self.session.send(self.socket,msg) - if self.msg_queue.empty(): - self.drop_io_state(POLLOUT) - - def _handle_err(self): - # We don't want to let this go silently, so eventually we should log. - raise zmq.ZMQError() - - def _queue_reply(self, msg): - self.msg_queue.put(msg) - self.add_io_state(POLLOUT) + self._queue_send(msg) class HBSocketChannel(ZMQSocketChannel):