From 2ad3ee4100f2a25c7f938f53ffc6c10116d783fc 2011-07-21 03:42:32
From: Brian Granger <ellisonbg@gmail.com>
Date: 2011-07-21 03:42:32
Subject: [PATCH] Different clients now share a single zmq session.

Previously, each client (browser window) would open its own set
of ZMQ sockets to a kernel. Now one master set of connections
to the kernel is created and all clients share those connections.
In some ways, this simplifies the URL design.

I have also made kernel_ids server-side created.

---

diff --git a/IPython/frontend/html/notebook/kernelmanager.py b/IPython/frontend/html/notebook/kernelmanager.py
index f4d19c1..fcddef2 100644
--- a/IPython/frontend/html/notebook/kernelmanager.py
+++ b/IPython/frontend/html/notebook/kernelmanager.py
@@ -1,5 +1,6 @@
 import signal
 import sys
+import uuid
 
 from IPython.zmq.ipkernel import launch_kernel
 from session import SessionManager
@@ -30,9 +31,8 @@ class KernelManager(object):
         else:
             return False
 
-    def start_kernel(self, kernel_id):
-        if kernel_id in self._kernels:
-            raise DuplicateKernelError("Kernel already exists: %s" % kernel_id)
+    def start_kernel(self):
+        kernel_id = str(uuid.uuid4())
         (process, shell_port, iopub_port, stdin_port, hb_port) = launch_kernel(pylab='inline')
         d = dict(
             process = process,
diff --git a/IPython/frontend/html/notebook/notebook.py b/IPython/frontend/html/notebook/notebook.py
index 5f48025..d2ddabc 100644
--- a/IPython/frontend/html/notebook/notebook.py
+++ b/IPython/frontend/html/notebook/notebook.py
@@ -3,6 +3,8 @@ import json
 import logging
 import os
 import urllib
+import uuid
+from Queue import Queue
 
 import zmq
 
@@ -21,8 +23,7 @@ from kernelmanager import KernelManager
 
 options.define("port", default=8888, help="run on the given port", type=int)
 
-_session_id_regex = r"(?P<session_id>\w+-\w+-\w+-\w+-\w+)"
-_kernel_id_regex = r"(?P<kernel_id>\w+)"
+_kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
 
 
 class MainHandler(web.RequestHandler):
@@ -30,79 +31,79 @@ class MainHandler(web.RequestHandler):
         self.render('notebook.html')
 
 
-class BaseKernelHandler(object):
-
-    def get_kernel(self):
-        return self.application.kernel_manager
-
-    def get_session(self, kernel_id):
-        km = self.get_kernel()
-        sm = km.get_session_manager(kernel_id)
-        return sm
-
-
-class KernelHandler(web.RequestHandler, BaseKernelHandler):
+class KernelHandler(web.RequestHandler):
 
     def get(self):
-        self.write(json.dumps(self.get_kernel().kernel_ids))
+        self.write(json.dumps(self.application.kernel_ids))
 
-    def post(self, *args, **kwargs):
-        kernel_id = kwargs['kernel_id']
-        self.get_kernel().start_kernel(kernel_id)
-        logging.info("Starting kernel: %s" % kernel_id)
+    def post(self):
+        kernel_id = self.application.start_kernel()
+        self.application.start_session(kernel_id)
         self.write(json.dumps(kernel_id))
 
 
-class SessionHandler(web.RequestHandler, BaseKernelHandler):
+class ZMQStreamRouter(object):
 
-    def get(self, *args, **kwargs):
-        kernel_id = kwargs['kernel_id']
-        self.write(json.dumps(self.get_session(kernel_id).session_ids))
+    def __init__(self, zmq_stream):
+        self.zmq_stream = zmq_stream
+        self._clients = {}
+        self.zmq_stream.on_recv(self._on_zmq_reply)
 
-    def post(self, *args, **kwargs):
-        kernel_id = kwargs['kernel_id']
-        sm = self.get_session(kernel_id)
-        session_id = sm.start_session()
-        logging.info("Starting session: %s, %s" % (kernel_id, session_id))
-        self.write(json.dumps(session_id))
+    def register_client(self, client):
+        client_id = uuid.uuid4()
+        self._clients[client_id] = client
+        return client_id
 
+    def unregister_client(self, client_id):
+        del self._clients[client_id]
 
-class ZMQStreamHandler(websocket.WebSocketHandler, BaseKernelHandler):
 
-    stream_name = ''
+class IOPubStreamRouter(ZMQStreamRouter):
 
-    def open(self, *args, **kwargs):
-        kernel_id = kwargs['kernel_id']
-        session_id = kwargs['session_id']
-        logging.info("Connection open: %s, %s" % (kernel_id,session_id))
-        sm = self.get_session(kernel_id)
-        method_name = "get_%s_stream" % self.stream_name
-        method = getattr(sm, method_name)
-        self.zmq_stream = method(session_id)
-        self.zmq_stream.on_recv(self._on_zmq_reply)
+    def _on_zmq_reply(self, msg_list):
+        for client_id, client in self._clients.items():
+            for msg in msg_list:
+                client.write_message(msg)
 
-    def on_message(self, msg):
-        logging.info("Message received: %r, %r" % (msg, self.__class__))
-        logging.info(self.zmq_stream)
-        self.zmq_stream.send_unicode(msg)
+    def forward_unicode(self, client_id, msg):
+        # This is a SUB stream that we should never write to.
+        pass
 
-    def on_close(self):
-        self.zmq_stream.close()
+
+class ShellStreamRouter(ZMQStreamRouter):
+
+    def __init__(self, zmq_stream):
+        ZMQStreamRouter.__init__(self, zmq_stream)
+        self._request_queue = Queue()
 
     def _on_zmq_reply(self, msg_list):
-        for msg in msg_list:
-            logging.info("Message reply: %r" % msg)
-            self.write_message(msg)
+        client_id = self._request_queue.get(block=False)
+        client = self._clients.get(client_id)
+        if client is not None:
+            for msg in msg_list:
+                client.write_message(msg)
+
+    def forward_unicode(self, client_id, msg):
+        self._request_queue.put(client_id)
+        self.zmq_stream.send_unicode(msg)
 
 
-class IOPubStreamHandler(ZMQStreamHandler):
+class ZMQStreamHandler(websocket.WebSocketHandler):
 
-    stream_name = 'iopub'
+    def initialize(self, stream_name):
+        self.stream_name = stream_name
 
+    def open(self, kernel_id):
+        self.router = self.application.get_router(kernel_id, self.stream_name)
+        self.client_id = self.router.register_client(self)
+        logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
 
-class ShellStreamHandler(ZMQStreamHandler):
+    def on_message(self, msg):
+        self.router.forward_unicode(self.client_id, msg)
 
-    stream_name = 'shell'
+    def on_close(self):
+        self.router.unregister_client(self.client_id)
+        logging.info("Connection closed: %s" % self.client_id)
 
 
 class NotebookRootHandler(web.RequestHandler):
@@ -157,10 +158,9 @@ class NotebookApplication(web.Application):
     def __init__(self):
         handlers = [
             (r"/", MainHandler),
-            (r"/kernels/%s" % (_kernel_id_regex,), KernelHandler),
-            (r"/kernels/%s/sessions" % (_kernel_id_regex,), SessionHandler),
-            (r"/kernels/%s/sessions/%s/iopub" % (_kernel_id_regex,_session_id_regex), IOPubStreamHandler),
-            (r"/kernels/%s/sessions/%s/shell" % (_kernel_id_regex,_session_id_regex), ShellStreamHandler),
+            (r"/kernels", KernelHandler),
+            (r"/kernels/%s/iopub" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='iopub')),
+            (r"/kernels/%s/shell" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='shell')),
             (r"/notebooks", NotebookRootHandler),
             (r"/notebooks/([^/]+)", NotebookHandler)
         ]
@@ -169,8 +169,46 @@ class NotebookApplication(web.Application):
             static_path=os.path.join(os.path.dirname(__file__), "static"),
         )
         web.Application.__init__(self, handlers, **settings)
+
         self.context = zmq.Context()
         self.kernel_manager = KernelManager(self.context)
+        self._session_dict = {}
+        self._routers = {}
+
+    #-------------------------------------------------------------------------
+    # Methods for managing kernels and sessions
+    #-------------------------------------------------------------------------
+
+    @property
+    def kernel_ids(self):
+        return self.kernel_manager.kernel_ids
+
+    def start_kernel(self):
+        kernel_id = self.kernel_manager.start_kernel()
+        logging.info("Kernel started: %s" % kernel_id)
+        return kernel_id
+
+    def start_session(self, kernel_id):
+        sm = self.kernel_manager.get_session_manager(kernel_id)
+        session_id = sm.start_session()
+        self._session_dict[kernel_id] = session_id
+        iopub_stream = sm.get_iopub_stream(session_id)
+        shell_stream = sm.get_shell_stream(session_id)
+        iopub_router = IOPubStreamRouter(iopub_stream)
+        shell_router = ShellStreamRouter(shell_stream)
+        self._routers[(kernel_id, session_id, 'iopub')] = iopub_router
+        self._routers[(kernel_id, session_id, 'shell')] = shell_router
+        logging.info("Session started: %s, %s" % (kernel_id, session_id))
+
+    def stop_session(self, kernel_id):
+        # TODO: finish this!
+        sm = self.kernel_manager.get_session_manager(kernel_id)
+        session_id = self._session_dict[kernel_id]
+
+    def get_router(self, kernel_id, stream_name):
+        session_id = self._session_dict[kernel_id]
+        router = self._routers[(kernel_id, session_id, stream_name)]
+        return router
 
 
 def main():
diff --git a/IPython/frontend/html/notebook/static/js/notebook.js b/IPython/frontend/html/notebook/static/js/notebook.js
index 0ae5e85..205d511 100644
--- a/IPython/frontend/html/notebook/static/js/notebook.js
+++ b/IPython/frontend/html/notebook/static/js/notebook.js
@@ -92,7 +92,6 @@ var Notebook = function (selector) {
     this.element.scroll();
     this.element.data("notebook", this);
     this.next_prompt_number = 1;
-    this.next_kernel_number = 0;
     this.kernel = null;
     this.msg_cell_map = {};
     this.bind_events();
@@ -429,20 +428,13 @@ Notebook.prototype.expand = function (index) {
 // Kernel related things
 
 Notebook.prototype.start_kernel = function () {
-    this.kernel = new Kernel("kernel" + this.next_kernel_number);
-    this.next_kernel_number = this.next_kernel_number + 1;
+    this.kernel = new Kernel();
     this.kernel.start_kernel(this._kernel_started, this);
 };
 
 
 Notebook.prototype._kernel_started = function () {
     console.log("Kernel started: ", this.kernel.kernel_id);
-    this.kernel.start_session(this._session_started, this);
-};
-
-
-Notebook.prototype._session_started = function () {
-    console.log("Session started: ", this.kernel.session_id);
     var that = this;
 
     this.kernel.shell_channel.onmessage = function (e) {
@@ -711,11 +703,10 @@ TextCell.prototype.config_mathjax = function () {
 //============================================================================
 
 
-var Kernel = function (kernel_id) {
-    this.kernel_id = kernel_id;
+var Kernel = function () {
+    this.kernel_id = null;
     this.base_url = "/kernels";
-    this.kernel_url = this.base_url + "/" + this.kernel_id
-    this.session_id = null;
+    this.kernel_url = null;
 };
 
 
@@ -734,32 +725,26 @@ Kernel.prototype.get_msg = function (msg_type, content) {
 }
 
 Kernel.prototype.start_kernel = function (callback, context) {
-    $.post(this.kernel_url, function () {
-        callback.call(context);
-    });
-};
-
-
-Kernel.prototype.start_session = function (callback, context) {
     var that = this;
-    $.post(this.kernel_url + "/sessions",
-        function (session_id) {
-            that._handle_start_session(session_id, callback, context);
-        },
-        'json');
-}
+    $.post(this.base_url,
+        function (kernel_id) {
+            that._handle_start_kernel(kernel_id, callback, context);
+        }, 
+        'json'
+    );
+};
 
 
-Kernel.prototype._handle_start_session = function (session_id, callback, context) {
-    this.session_id = session_id;
-    this.session_url = this.kernel_url + "/sessions/" + this.session_id;
+Kernel.prototype._handle_start_kernel = function (kernel_id, callback, context) {
+    this.kernel_id = kernel_id;
+    this.kernel_url = this.base_url + "/" + this.kernel_id;
     this._start_channels();
     callback.call(context);
 };
 
 
 Kernel.prototype._start_channels = function () {
-    var ws_url = "ws://127.0.0.1:8888" + this.session_url;
+    var ws_url = "ws://127.0.0.1:8888" + this.kernel_url;
     this.shell_channel = new WebSocket(ws_url + "/shell");
     this.iopub_channel = new WebSocket(ws_url + "/iopub");
 }