sessionmanager.py
90 lines
| 3.0 KiB
| text/x-python
|
PythonLexer
Brian E. Granger
|
r4343 | """A manager for session and channels for a single kernel.""" | ||
Brian E. Granger
|
r4344 | #----------------------------------------------------------------------------- | ||
Brian E. Granger
|
r4348 | # Copyright (C) 2011 The IPython Development Team | ||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING.txt, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
Brian E. Granger
|
r4344 | # Imports | ||
#----------------------------------------------------------------------------- | ||||
Brian E. Granger
|
r4343 | import zmq | ||
from zmq.eventloop.zmqstream import ZMQStream | ||||
from IPython.utils.traitlets import Instance, Dict, CBytes, Bool | ||||
from IPython.zmq.session import SessionFactory | ||||
class SessionManagerRunningError(Exception): | ||||
pass | ||||
Brian E. Granger
|
r4344 | #----------------------------------------------------------------------------- | ||
# Classes | ||||
#----------------------------------------------------------------------------- | ||||
Brian E. Granger
|
r4343 | |||
class SessionManager(SessionFactory): | ||||
"""Manages a session for a kernel. | ||||
The object manages a variety of things for a connection session to | ||||
a running kernel: | ||||
* The set of channels or connected ZMQ streams to the kernel. | ||||
* An IPython.zmq.session.Session object that manages send/recv logic | ||||
for those channels. | ||||
""" | ||||
kernel_manager = Instance('IPython.frontend.html.notebook.kernelmanager.KernelManager') | ||||
kernel_id = CBytes(b'') | ||||
_session_streams = Dict() | ||||
_running = Bool(False) | ||||
def __init__(self, **kwargs): | ||||
kernel_id = kwargs.pop('kernel_id') | ||||
super(SessionManager, self).__init__(**kwargs) | ||||
self.kernel_id = kernel_id | ||||
self.start() | ||||
def __del__(self): | ||||
self.stop() | ||||
def start(self): | ||||
if not self._running: | ||||
ports = self.kernel_manager.get_kernel_ports(self.kernel_id) | ||||
iopub_stream = self.create_connected_stream(ports['iopub_port'], zmq.SUB) | ||||
iopub_stream.socket.setsockopt(zmq.SUBSCRIBE, b'') | ||||
shell_stream = self.create_connected_stream(ports['shell_port'], zmq.XREQ) | ||||
self._session_streams = dict( | ||||
iopub_stream = iopub_stream, | ||||
shell_stream = shell_stream | ||||
) | ||||
self._running = True | ||||
else: | ||||
raise SessionManagerRunningError( | ||||
'Session manager is already running, call stop() before start()' | ||||
) | ||||
def stop(self): | ||||
if self._running: | ||||
for name, stream in self._session_streams.items(): | ||||
stream.close() | ||||
self._session_streams = {} | ||||
self._running = False | ||||
def create_connected_stream(self, port, socket_type): | ||||
sock = self.context.socket(socket_type) | ||||
addr = "tcp://%s:%i" % (self.kernel_manager.get_kernel_ip(self.kernel_id), port) | ||||
Brian E. Granger
|
r4345 | self.log.info("Connecting to: %s" % addr) | ||
Brian E. Granger
|
r4343 | sock.connect(addr) | ||
return ZMQStream(sock) | ||||
def get_iopub_stream(self): | ||||
return self._session_streams['iopub_stream'] | ||||
def get_shell_stream(self): | ||||
return self._session_streams['shell_stream'] | ||||