##// END OF EJS Templates
do not use ZMQStream for IOPub...
MinRK -
Show More
@@ -184,25 +184,25 b' class EngineFactory(RegistrationFactory):'
184
184
185 # create iopub stream:
185 # create iopub stream:
186 iopub_addr = msg.content.iopub
186 iopub_addr = msg.content.iopub
187 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
187 iopub_socket = ctx.socket(zmq.PUB)
188 iopub_stream.setsockopt(zmq.IDENTITY, identity)
188 iopub_socket.setsockopt(zmq.IDENTITY, identity)
189 connect(iopub_stream, iopub_addr)
189 connect(iopub_socket, iopub_addr)
190
190
191 # disable history:
191 # disable history:
192 self.config.HistoryManager.hist_file = ':memory:'
192 self.config.HistoryManager.hist_file = ':memory:'
193
193
194 # Redirect input streams and set a display hook.
194 # Redirect input streams and set a display hook.
195 if self.out_stream_factory:
195 if self.out_stream_factory:
196 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
196 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
197 sys.stdout.topic = py3compat.cast_bytes('engine.%i.stdout' % self.id)
197 sys.stdout.topic = py3compat.cast_bytes('engine.%i.stdout' % self.id)
198 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
198 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
199 sys.stderr.topic = py3compat.cast_bytes('engine.%i.stderr' % self.id)
199 sys.stderr.topic = py3compat.cast_bytes('engine.%i.stderr' % self.id)
200 if self.display_hook_factory:
200 if self.display_hook_factory:
201 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
201 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
202 sys.displayhook.topic = py3compat.cast_bytes('engine.%i.pyout' % self.id)
202 sys.displayhook.topic = py3compat.cast_bytes('engine.%i.pyout' % self.id)
203
203
204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
204 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
205 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
205 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
206 loop=loop, user_ns=self.user_ns, log=self.log)
206 loop=loop, user_ns=self.user_ns, log=self.log)
207 self.kernel.start()
207 self.kernel.start()
208
208
@@ -80,7 +80,7 b' class Kernel(Configurable):'
80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
80 profile_dir = Instance('IPython.core.profiledir.ProfileDir')
81 shell_streams = List()
81 shell_streams = List()
82 control_stream = Instance(ZMQStream)
82 control_stream = Instance(ZMQStream)
83 iopub_stream = Instance(ZMQStream)
83 iopub_socket = Instance(zmq.Socket)
84 stdin_socket = Instance(zmq.Socket)
84 stdin_socket = Instance(zmq.Socket)
85 log = Instance(logging.Logger)
85 log = Instance(logging.Logger)
86
86
@@ -144,9 +144,9 b' class Kernel(Configurable):'
144 user_ns = self.user_ns,
144 user_ns = self.user_ns,
145 )
145 )
146 self.shell.displayhook.session = self.session
146 self.shell.displayhook.session = self.session
147 self.shell.displayhook.pub_socket = self.iopub_stream.socket
147 self.shell.displayhook.pub_socket = self.iopub_socket
148 self.shell.display_pub.session = self.session
148 self.shell.display_pub.session = self.session
149 self.shell.display_pub.pub_socket = self.iopub_stream.socket
149 self.shell.display_pub.pub_socket = self.iopub_socket
150
150
151 # TMP - hack while developing
151 # TMP - hack while developing
152 self.shell._reply_content = None
152 self.shell._reply_content = None
@@ -266,7 +266,6 b' class Kernel(Configurable):'
266 # handle at most one request per iteration
266 # handle at most one request per iteration
267 stream.flush(zmq.POLLIN, 1)
267 stream.flush(zmq.POLLIN, 1)
268 stream.flush(zmq.POLLOUT)
268 stream.flush(zmq.POLLOUT)
269 self.iopub_stream.flush()
270
269
271
270
272 def record_ports(self, ports):
271 def record_ports(self, ports):
@@ -284,14 +283,14 b' class Kernel(Configurable):'
284 def _publish_pyin(self, code, parent, execution_count):
283 def _publish_pyin(self, code, parent, execution_count):
285 """Publish the code request on the pyin stream."""
284 """Publish the code request on the pyin stream."""
286
285
287 self.session.send(self.iopub_stream, u'pyin',
286 self.session.send(self.iopub_socket, u'pyin',
288 {u'code':code, u'execution_count': execution_count},
287 {u'code':code, u'execution_count': execution_count},
289 parent=parent, ident=self._topic('pyin')
288 parent=parent, ident=self._topic('pyin')
290 )
289 )
291
290
292 def execute_request(self, stream, ident, parent):
291 def execute_request(self, stream, ident, parent):
293
292
294 self.session.send(self.iopub_stream,
293 self.session.send(self.iopub_socket,
295 u'status',
294 u'status',
296 {u'execution_state':u'busy'},
295 {u'execution_state':u'busy'},
297 parent=parent,
296 parent=parent,
@@ -401,7 +400,7 b' class Kernel(Configurable):'
401 if not silent and reply_msg['content']['status'] == u'error':
400 if not silent and reply_msg['content']['status'] == u'error':
402 self._abort_queues()
401 self._abort_queues()
403
402
404 self.session.send(self.iopub_stream,
403 self.session.send(self.iopub_socket,
405 u'status',
404 u'status',
406 {u'execution_state':u'idle'},
405 {u'execution_state':u'idle'},
407 parent=parent,
406 parent=parent,
@@ -496,8 +495,8 b' class Kernel(Configurable):'
496 self.log.error("Got bad msg: %s", parent, exc_info=True)
495 self.log.error("Got bad msg: %s", parent, exc_info=True)
497 return
496 return
498 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
497 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
499 # self.iopub_stream.send(pyin_msg)
498 # self.iopub_socket.send(pyin_msg)
500 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
499 # self.session.send(self.iopub_socket, u'pyin', {u'code':code},parent=parent)
501 sub = {'dependencies_met' : True, 'engine' : self.ident,
500 sub = {'dependencies_met' : True, 'engine' : self.ident,
502 'started': datetime.now()}
501 'started': datetime.now()}
503 try:
502 try:
@@ -535,7 +534,7 b' class Kernel(Configurable):'
535 except:
534 except:
536 exc_content = self._wrap_exception('apply')
535 exc_content = self._wrap_exception('apply')
537 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
536 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
538 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
537 self.session.send(self.iopub_socket, u'pyerr', exc_content, parent=parent,
539 ident=self._topic('pyerr'))
538 ident=self._topic('pyerr'))
540 reply_content = exc_content
539 reply_content = exc_content
541 result_buf = []
540 result_buf = []
@@ -714,9 +713,9 b' class Kernel(Configurable):'
714 """
713 """
715 # io.rprint("Kernel at_shutdown") # dbg
714 # io.rprint("Kernel at_shutdown") # dbg
716 if self._shutdown_message is not None:
715 if self._shutdown_message is not None:
717 self.session.send(self.iopub_stream, self._shutdown_message, ident=self._topic('shutdown'))
716 self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown'))
718 self.log.debug("%s", self._shutdown_message)
717 self.log.debug("%s", self._shutdown_message)
719 [ s.flush(zmq.POLLOUT) for s in self.shell_streams + [self.iopub_stream] ]
718 [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]
720
719
721 #-----------------------------------------------------------------------------
720 #-----------------------------------------------------------------------------
722 # Aliases and Flags for the IPKernelApp
721 # Aliases and Flags for the IPKernelApp
@@ -771,11 +770,10 b' class IPKernelApp(KernelApp, InteractiveShellApp):'
771 def init_kernel(self):
770 def init_kernel(self):
772
771
773 shell_stream = ZMQStream(self.shell_socket)
772 shell_stream = ZMQStream(self.shell_socket)
774 iopub_stream = ZMQStream(self.iopub_socket)
775
773
776 kernel = Kernel(config=self.config, session=self.session,
774 kernel = Kernel(config=self.config, session=self.session,
777 shell_streams=[shell_stream],
775 shell_streams=[shell_stream],
778 iopub_stream=iopub_stream,
776 iopub_socket=self.iopub_socket,
779 stdin_socket=self.stdin_socket,
777 stdin_socket=self.stdin_socket,
780 log=self.log,
778 log=self.log,
781 profile_dir=self.profile_dir,
779 profile_dir=self.profile_dir,
General Comments 0
You need to be logged in to leave comments. Login now