From 2ad3ee4100f2a25c7f938f53ffc6c10116d783fc 2011-07-21 03:42:32 From: Brian Granger <ellisonbg@gmail.com> Date: 2011-07-21 03:42:32 Subject: [PATCH] Different clients now share a single zmq session. Previously, each client (browser window) would open its own set of ZMQ sockets to a kernel. Now one master set of connections to the kernel is created and all clients share those connections. In some ways, this simplifies the URL design. I have also made kernel_ids server-side created. --- diff --git a/IPython/frontend/html/notebook/kernelmanager.py b/IPython/frontend/html/notebook/kernelmanager.py index f4d19c1..fcddef2 100644 --- a/IPython/frontend/html/notebook/kernelmanager.py +++ b/IPython/frontend/html/notebook/kernelmanager.py @@ -1,5 +1,6 @@ import signal import sys +import uuid from IPython.zmq.ipkernel import launch_kernel from session import SessionManager @@ -30,9 +31,8 @@ class KernelManager(object): else: return False - def start_kernel(self, kernel_id): - if kernel_id in self._kernels: - raise DuplicateKernelError("Kernel already exists: %s" % kernel_id) + def start_kernel(self): + kernel_id = str(uuid.uuid4()) (process, shell_port, iopub_port, stdin_port, hb_port) = launch_kernel(pylab='inline') d = dict( process = process, diff --git a/IPython/frontend/html/notebook/notebook.py b/IPython/frontend/html/notebook/notebook.py index 5f48025..d2ddabc 100644 --- a/IPython/frontend/html/notebook/notebook.py +++ b/IPython/frontend/html/notebook/notebook.py @@ -3,6 +3,8 @@ import json import logging import os import urllib +import uuid +from Queue import Queue import zmq @@ -21,8 +23,7 @@ from kernelmanager import KernelManager options.define("port", default=8888, help="run on the given port", type=int) -_session_id_regex = r"(?P<session_id>\w+-\w+-\w+-\w+-\w+)" -_kernel_id_regex = r"(?P<kernel_id>\w+)" +_kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)" class MainHandler(web.RequestHandler): @@ -30,79 +31,79 @@ class MainHandler(web.RequestHandler): self.render('notebook.html') -class BaseKernelHandler(object): - - def get_kernel(self): - return self.application.kernel_manager - - def get_session(self, kernel_id): - km = self.get_kernel() - sm = km.get_session_manager(kernel_id) - return sm - - -class KernelHandler(web.RequestHandler, BaseKernelHandler): +class KernelHandler(web.RequestHandler): def get(self): - self.write(json.dumps(self.get_kernel().kernel_ids)) + self.write(json.dumps(self.application.kernel_ids)) - def post(self, *args, **kwargs): - kernel_id = kwargs['kernel_id'] - self.get_kernel().start_kernel(kernel_id) - logging.info("Starting kernel: %s" % kernel_id) + def post(self): + kernel_id = self.application.start_kernel() + self.application.start_session(kernel_id) self.write(json.dumps(kernel_id)) -class SessionHandler(web.RequestHandler, BaseKernelHandler): +class ZMQStreamRouter(object): - def get(self, *args, **kwargs): - kernel_id = kwargs['kernel_id'] - self.write(json.dumps(self.get_session(kernel_id).session_ids)) + def __init__(self, zmq_stream): + self.zmq_stream = zmq_stream + self._clients = {} + self.zmq_stream.on_recv(self._on_zmq_reply) - def post(self, *args, **kwargs): - kernel_id = kwargs['kernel_id'] - sm = self.get_session(kernel_id) - session_id = sm.start_session() - logging.info("Starting session: %s, %s" % (kernel_id, session_id)) - self.write(json.dumps(session_id)) + def register_client(self, client): + client_id = uuid.uuid4() + self._clients[client_id] = client + return client_id + def unregister_client(self, client_id): + del self._clients[client_id] -class ZMQStreamHandler(websocket.WebSocketHandler, BaseKernelHandler): - stream_name = '' +class IOPubStreamRouter(ZMQStreamRouter): - def open(self, *args, **kwargs): - kernel_id = kwargs['kernel_id'] - session_id = kwargs['session_id'] - logging.info("Connection open: %s, %s" % (kernel_id,session_id)) - sm = self.get_session(kernel_id) - method_name = "get_%s_stream" % self.stream_name - method = getattr(sm, method_name) - self.zmq_stream = method(session_id) - self.zmq_stream.on_recv(self._on_zmq_reply) + def _on_zmq_reply(self, msg_list): + for client_id, client in self._clients.items(): + for msg in msg_list: + client.write_message(msg) - def on_message(self, msg): - logging.info("Message received: %r, %r" % (msg, self.__class__)) - logging.info(self.zmq_stream) - self.zmq_stream.send_unicode(msg) + def forward_unicode(self, client_id, msg): + # This is a SUB stream that we should never write to. + pass - def on_close(self): - self.zmq_stream.close() + +class ShellStreamRouter(ZMQStreamRouter): + + def __init__(self, zmq_stream): + ZMQStreamRouter.__init__(self, zmq_stream) + self._request_queue = Queue() def _on_zmq_reply(self, msg_list): - for msg in msg_list: - logging.info("Message reply: %r" % msg) - self.write_message(msg) + client_id = self._request_queue.get(block=False) + client = self._clients.get(client_id) + if client is not None: + for msg in msg_list: + client.write_message(msg) + + def forward_unicode(self, client_id, msg): + self._request_queue.put(client_id) + self.zmq_stream.send_unicode(msg) -class IOPubStreamHandler(ZMQStreamHandler): +class ZMQStreamHandler(websocket.WebSocketHandler): - stream_name = 'iopub' + def initialize(self, stream_name): + self.stream_name = stream_name + def open(self, kernel_id): + self.router = self.application.get_router(kernel_id, self.stream_name) + self.client_id = self.router.register_client(self) + logging.info("Connection open: %s, %s" % (kernel_id, self.client_id)) -class ShellStreamHandler(ZMQStreamHandler): + def on_message(self, msg): + self.router.forward_unicode(self.client_id, msg) - stream_name = 'shell' + def on_close(self): + self.router.unregister_client(self.client_id) + logging.info("Connection closed: %s" % self.client_id) class NotebookRootHandler(web.RequestHandler): @@ -157,10 +158,9 @@ class NotebookApplication(web.Application): def __init__(self): handlers = [ (r"/", MainHandler), - (r"/kernels/%s" % (_kernel_id_regex,), KernelHandler), - (r"/kernels/%s/sessions" % (_kernel_id_regex,), SessionHandler), - (r"/kernels/%s/sessions/%s/iopub" % (_kernel_id_regex,_session_id_regex), IOPubStreamHandler), - (r"/kernels/%s/sessions/%s/shell" % (_kernel_id_regex,_session_id_regex), ShellStreamHandler), + (r"/kernels", KernelHandler), + (r"/kernels/%s/iopub" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='iopub')), + (r"/kernels/%s/shell" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='shell')), (r"/notebooks", NotebookRootHandler), (r"/notebooks/([^/]+)", NotebookHandler) ] @@ -169,8 +169,46 @@ class NotebookApplication(web.Application): static_path=os.path.join(os.path.dirname(__file__), "static"), ) web.Application.__init__(self, handlers, **settings) + self.context = zmq.Context() self.kernel_manager = KernelManager(self.context) + self._session_dict = {} + self._routers = {} + + #------------------------------------------------------------------------- + # Methods for managing kernels and sessions + #------------------------------------------------------------------------- + + @property + def kernel_ids(self): + return self.kernel_manager.kernel_ids + + def start_kernel(self): + kernel_id = self.kernel_manager.start_kernel() + logging.info("Kernel started: %s" % kernel_id) + return kernel_id + + def start_session(self, kernel_id): + sm = self.kernel_manager.get_session_manager(kernel_id) + session_id = sm.start_session() + self._session_dict[kernel_id] = session_id + iopub_stream = sm.get_iopub_stream(session_id) + shell_stream = sm.get_shell_stream(session_id) + iopub_router = IOPubStreamRouter(iopub_stream) + shell_router = ShellStreamRouter(shell_stream) + self._routers[(kernel_id, session_id, 'iopub')] = iopub_router + self._routers[(kernel_id, session_id, 'shell')] = shell_router + logging.info("Session started: %s, %s" % (kernel_id, session_id)) + + def stop_session(self, kernel_id): + # TODO: finish this! + sm = self.kernel_manager.get_session_manager(kernel_id) + session_id = self._session_dict[kernel_id] + + def get_router(self, kernel_id, stream_name): + session_id = self._session_dict[kernel_id] + router = self._routers[(kernel_id, session_id, stream_name)] + return router def main(): diff --git a/IPython/frontend/html/notebook/static/js/notebook.js b/IPython/frontend/html/notebook/static/js/notebook.js index 0ae5e85..205d511 100644 --- a/IPython/frontend/html/notebook/static/js/notebook.js +++ b/IPython/frontend/html/notebook/static/js/notebook.js @@ -92,7 +92,6 @@ var Notebook = function (selector) { this.element.scroll(); this.element.data("notebook", this); this.next_prompt_number = 1; - this.next_kernel_number = 0; this.kernel = null; this.msg_cell_map = {}; this.bind_events(); @@ -429,20 +428,13 @@ Notebook.prototype.expand = function (index) { // Kernel related things Notebook.prototype.start_kernel = function () { - this.kernel = new Kernel("kernel" + this.next_kernel_number); - this.next_kernel_number = this.next_kernel_number + 1; + this.kernel = new Kernel(); this.kernel.start_kernel(this._kernel_started, this); }; Notebook.prototype._kernel_started = function () { console.log("Kernel started: ", this.kernel.kernel_id); - this.kernel.start_session(this._session_started, this); -}; - - -Notebook.prototype._session_started = function () { - console.log("Session started: ", this.kernel.session_id); var that = this; this.kernel.shell_channel.onmessage = function (e) { @@ -711,11 +703,10 @@ TextCell.prototype.config_mathjax = function () { //============================================================================ -var Kernel = function (kernel_id) { - this.kernel_id = kernel_id; +var Kernel = function () { + this.kernel_id = null; this.base_url = "/kernels"; - this.kernel_url = this.base_url + "/" + this.kernel_id - this.session_id = null; + this.kernel_url = null; }; @@ -734,32 +725,26 @@ Kernel.prototype.get_msg = function (msg_type, content) { } Kernel.prototype.start_kernel = function (callback, context) { - $.post(this.kernel_url, function () { - callback.call(context); - }); -}; - - -Kernel.prototype.start_session = function (callback, context) { var that = this; - $.post(this.kernel_url + "/sessions", - function (session_id) { - that._handle_start_session(session_id, callback, context); - }, - 'json'); -} + $.post(this.base_url, + function (kernel_id) { + that._handle_start_kernel(kernel_id, callback, context); + }, + 'json' + ); +}; -Kernel.prototype._handle_start_session = function (session_id, callback, context) { - this.session_id = session_id; - this.session_url = this.kernel_url + "/sessions/" + this.session_id; +Kernel.prototype._handle_start_kernel = function (kernel_id, callback, context) { + this.kernel_id = kernel_id; + this.kernel_url = this.base_url + "/" + this.kernel_id; this._start_channels(); callback.call(context); }; Kernel.prototype._start_channels = function () { - var ws_url = "ws://127.0.0.1:8888" + this.session_url; + var ws_url = "ws://127.0.0.1:8888" + this.kernel_url; this.shell_channel = new WebSocket(ws_url + "/shell"); this.iopub_channel = new WebSocket(ws_url + "/iopub"); }