diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 002ff3d..e9578cb 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -184,25 +184,25 @@ class EngineFactory(RegistrationFactory): # create iopub stream: iopub_addr = msg.content.iopub - iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop) - iopub_stream.setsockopt(zmq.IDENTITY, identity) - connect(iopub_stream, iopub_addr) + iopub_socket = ctx.socket(zmq.PUB) + iopub_socket.setsockopt(zmq.IDENTITY, identity) + connect(iopub_socket, iopub_addr) # disable history: self.config.HistoryManager.hist_file = ':memory:' # Redirect input streams and set a display hook. if self.out_stream_factory: - sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout') + sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout') sys.stdout.topic = py3compat.cast_bytes('engine.%i.stdout' % self.id) - sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr') + sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr') sys.stderr.topic = py3compat.cast_bytes('engine.%i.stderr' % self.id) if self.display_hook_factory: - sys.displayhook = self.display_hook_factory(self.session, iopub_stream) + sys.displayhook = self.display_hook_factory(self.session, iopub_socket) sys.displayhook.topic = py3compat.cast_bytes('engine.%i.pyout' % self.id) self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session, - control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream, + control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket, loop=loop, user_ns=self.user_ns, log=self.log) self.kernel.start() diff --git a/IPython/zmq/ipkernel.py b/IPython/zmq/ipkernel.py index 39840ca..2a80610 100755 --- a/IPython/zmq/ipkernel.py +++ b/IPython/zmq/ipkernel.py @@ -80,7 +80,7 @@ class Kernel(Configurable): profile_dir = Instance('IPython.core.profiledir.ProfileDir') shell_streams = List() control_stream = Instance(ZMQStream) - iopub_stream = Instance(ZMQStream) + iopub_socket = Instance(zmq.Socket) stdin_socket = Instance(zmq.Socket) log = Instance(logging.Logger) @@ -144,9 +144,9 @@ class Kernel(Configurable): user_ns = self.user_ns, ) self.shell.displayhook.session = self.session - self.shell.displayhook.pub_socket = self.iopub_stream.socket + self.shell.displayhook.pub_socket = self.iopub_socket self.shell.display_pub.session = self.session - self.shell.display_pub.pub_socket = self.iopub_stream.socket + self.shell.display_pub.pub_socket = self.iopub_socket # TMP - hack while developing self.shell._reply_content = None @@ -266,7 +266,6 @@ class Kernel(Configurable): # handle at most one request per iteration stream.flush(zmq.POLLIN, 1) stream.flush(zmq.POLLOUT) - self.iopub_stream.flush() def record_ports(self, ports): @@ -284,14 +283,14 @@ class Kernel(Configurable): def _publish_pyin(self, code, parent, execution_count): """Publish the code request on the pyin stream.""" - self.session.send(self.iopub_stream, u'pyin', + self.session.send(self.iopub_socket, u'pyin', {u'code':code, u'execution_count': execution_count}, parent=parent, ident=self._topic('pyin') ) def execute_request(self, stream, ident, parent): - self.session.send(self.iopub_stream, + self.session.send(self.iopub_socket, u'status', {u'execution_state':u'busy'}, parent=parent, @@ -401,7 +400,7 @@ class Kernel(Configurable): if not silent and reply_msg['content']['status'] == u'error': self._abort_queues() - self.session.send(self.iopub_stream, + self.session.send(self.iopub_socket, u'status', {u'execution_state':u'idle'}, parent=parent, @@ -496,8 +495,8 @@ class Kernel(Configurable): self.log.error("Got bad msg: %s", parent, exc_info=True) return # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) - # self.iopub_stream.send(pyin_msg) - # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent) + # self.iopub_socket.send(pyin_msg) + # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent) sub = {'dependencies_met' : True, 'engine' : self.ident, 'started': datetime.now()} try: @@ -535,7 +534,7 @@ class Kernel(Configurable): except: exc_content = self._wrap_exception('apply') # exc_msg = self.session.msg(u'pyerr', exc_content, parent) - self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent, + self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent, ident=self._topic('pyerr')) reply_content = exc_content result_buf = [] @@ -714,9 +713,9 @@ class Kernel(Configurable): """ # io.rprint("Kernel at_shutdown") # dbg if self._shutdown_message is not None: - self.session.send(self.iopub_stream, self._shutdown_message, ident=self._topic('shutdown')) + self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) self.log.debug("%s", self._shutdown_message) - [ s.flush(zmq.POLLOUT) for s in self.shell_streams + [self.iopub_stream] ] + [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] #----------------------------------------------------------------------------- # Aliases and Flags for the IPKernelApp @@ -771,11 +770,10 @@ class IPKernelApp(KernelApp, InteractiveShellApp): def init_kernel(self): shell_stream = ZMQStream(self.shell_socket) - iopub_stream = ZMQStream(self.iopub_socket) kernel = Kernel(config=self.config, session=self.session, shell_streams=[shell_stream], - iopub_stream=iopub_stream, + iopub_socket=self.iopub_socket, stdin_socket=self.stdin_socket, log=self.log, profile_dir=self.profile_dir,