##// END OF EJS Templates
Different clients now share a single zmq session....
Brian Granger -
Show More
@@ -1,5 +1,6
1 1 import signal
2 2 import sys
3 import uuid
3 4
4 5 from IPython.zmq.ipkernel import launch_kernel
5 6 from session import SessionManager
@@ -30,9 +31,8 class KernelManager(object):
30 31 else:
31 32 return False
32 33
33 def start_kernel(self, kernel_id):
34 if kernel_id in self._kernels:
35 raise DuplicateKernelError("Kernel already exists: %s" % kernel_id)
34 def start_kernel(self):
35 kernel_id = str(uuid.uuid4())
36 36 (process, shell_port, iopub_port, stdin_port, hb_port) = launch_kernel(pylab='inline')
37 37 d = dict(
38 38 process = process,
@@ -3,6 +3,8 import json
3 3 import logging
4 4 import os
5 5 import urllib
6 import uuid
7 from Queue import Queue
6 8
7 9 import zmq
8 10
@@ -21,8 +23,7 from kernelmanager import KernelManager
21 23
22 24 options.define("port", default=8888, help="run on the given port", type=int)
23 25
24 _session_id_regex = r"(?P<session_id>\w+-\w+-\w+-\w+-\w+)"
25 _kernel_id_regex = r"(?P<kernel_id>\w+)"
26 _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
26 27
27 28
28 29 class MainHandler(web.RequestHandler):
@@ -30,79 +31,79 class MainHandler(web.RequestHandler):
30 31 self.render('notebook.html')
31 32
32 33
33 class BaseKernelHandler(object):
34
35 def get_kernel(self):
36 return self.application.kernel_manager
37
38 def get_session(self, kernel_id):
39 km = self.get_kernel()
40 sm = km.get_session_manager(kernel_id)
41 return sm
42
43
44 class KernelHandler(web.RequestHandler, BaseKernelHandler):
34 class KernelHandler(web.RequestHandler):
45 35
46 36 def get(self):
47 self.write(json.dumps(self.get_kernel().kernel_ids))
37 self.write(json.dumps(self.application.kernel_ids))
48 38
49 def post(self, *args, **kwargs):
50 kernel_id = kwargs['kernel_id']
51 self.get_kernel().start_kernel(kernel_id)
52 logging.info("Starting kernel: %s" % kernel_id)
39 def post(self):
40 kernel_id = self.application.start_kernel()
41 self.application.start_session(kernel_id)
53 42 self.write(json.dumps(kernel_id))
54 43
55 44
56 class SessionHandler(web.RequestHandler, BaseKernelHandler):
45 class ZMQStreamRouter(object):
57 46
58 def get(self, *args, **kwargs):
59 kernel_id = kwargs['kernel_id']
60 self.write(json.dumps(self.get_session(kernel_id).session_ids))
47 def __init__(self, zmq_stream):
48 self.zmq_stream = zmq_stream
49 self._clients = {}
50 self.zmq_stream.on_recv(self._on_zmq_reply)
61 51
62 def post(self, *args, **kwargs):
63 kernel_id = kwargs['kernel_id']
64 sm = self.get_session(kernel_id)
65 session_id = sm.start_session()
66 logging.info("Starting session: %s, %s" % (kernel_id, session_id))
67 self.write(json.dumps(session_id))
52 def register_client(self, client):
53 client_id = uuid.uuid4()
54 self._clients[client_id] = client
55 return client_id
68 56
57 def unregister_client(self, client_id):
58 del self._clients[client_id]
69 59
70 class ZMQStreamHandler(websocket.WebSocketHandler, BaseKernelHandler):
71 60
72 stream_name = ''
61 class IOPubStreamRouter(ZMQStreamRouter):
73 62
74 def open(self, *args, **kwargs):
75 kernel_id = kwargs['kernel_id']
76 session_id = kwargs['session_id']
77 logging.info("Connection open: %s, %s" % (kernel_id,session_id))
78 sm = self.get_session(kernel_id)
79 method_name = "get_%s_stream" % self.stream_name
80 method = getattr(sm, method_name)
81 self.zmq_stream = method(session_id)
82 self.zmq_stream.on_recv(self._on_zmq_reply)
63 def _on_zmq_reply(self, msg_list):
64 for client_id, client in self._clients.items():
65 for msg in msg_list:
66 client.write_message(msg)
83 67
84 def on_message(self, msg):
85 logging.info("Message received: %r, %r" % (msg, self.__class__))
86 logging.info(self.zmq_stream)
87 self.zmq_stream.send_unicode(msg)
68 def forward_unicode(self, client_id, msg):
69 # This is a SUB stream that we should never write to.
70 pass
88 71
89 def on_close(self):
90 self.zmq_stream.close()
72
73 class ShellStreamRouter(ZMQStreamRouter):
74
75 def __init__(self, zmq_stream):
76 ZMQStreamRouter.__init__(self, zmq_stream)
77 self._request_queue = Queue()
91 78
92 79 def _on_zmq_reply(self, msg_list):
93 for msg in msg_list:
94 logging.info("Message reply: %r" % msg)
95 self.write_message(msg)
80 client_id = self._request_queue.get(block=False)
81 client = self._clients.get(client_id)
82 if client is not None:
83 for msg in msg_list:
84 client.write_message(msg)
85
86 def forward_unicode(self, client_id, msg):
87 self._request_queue.put(client_id)
88 self.zmq_stream.send_unicode(msg)
96 89
97 90
98 class IOPubStreamHandler(ZMQStreamHandler):
91 class ZMQStreamHandler(websocket.WebSocketHandler):
99 92
100 stream_name = 'iopub'
93 def initialize(self, stream_name):
94 self.stream_name = stream_name
101 95
96 def open(self, kernel_id):
97 self.router = self.application.get_router(kernel_id, self.stream_name)
98 self.client_id = self.router.register_client(self)
99 logging.info("Connection open: %s, %s" % (kernel_id, self.client_id))
102 100
103 class ShellStreamHandler(ZMQStreamHandler):
101 def on_message(self, msg):
102 self.router.forward_unicode(self.client_id, msg)
104 103
105 stream_name = 'shell'
104 def on_close(self):
105 self.router.unregister_client(self.client_id)
106 logging.info("Connection closed: %s" % self.client_id)
106 107
107 108
108 109 class NotebookRootHandler(web.RequestHandler):
@@ -157,10 +158,9 class NotebookApplication(web.Application):
157 158 def __init__(self):
158 159 handlers = [
159 160 (r"/", MainHandler),
160 (r"/kernels/%s" % (_kernel_id_regex,), KernelHandler),
161 (r"/kernels/%s/sessions" % (_kernel_id_regex,), SessionHandler),
162 (r"/kernels/%s/sessions/%s/iopub" % (_kernel_id_regex,_session_id_regex), IOPubStreamHandler),
163 (r"/kernels/%s/sessions/%s/shell" % (_kernel_id_regex,_session_id_regex), ShellStreamHandler),
161 (r"/kernels", KernelHandler),
162 (r"/kernels/%s/iopub" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='iopub')),
163 (r"/kernels/%s/shell" % _kernel_id_regex, ZMQStreamHandler, dict(stream_name='shell')),
164 164 (r"/notebooks", NotebookRootHandler),
165 165 (r"/notebooks/([^/]+)", NotebookHandler)
166 166 ]
@@ -169,8 +169,46 class NotebookApplication(web.Application):
169 169 static_path=os.path.join(os.path.dirname(__file__), "static"),
170 170 )
171 171 web.Application.__init__(self, handlers, **settings)
172
172 173 self.context = zmq.Context()
173 174 self.kernel_manager = KernelManager(self.context)
175 self._session_dict = {}
176 self._routers = {}
177
178 #-------------------------------------------------------------------------
179 # Methods for managing kernels and sessions
180 #-------------------------------------------------------------------------
181
182 @property
183 def kernel_ids(self):
184 return self.kernel_manager.kernel_ids
185
186 def start_kernel(self):
187 kernel_id = self.kernel_manager.start_kernel()
188 logging.info("Kernel started: %s" % kernel_id)
189 return kernel_id
190
191 def start_session(self, kernel_id):
192 sm = self.kernel_manager.get_session_manager(kernel_id)
193 session_id = sm.start_session()
194 self._session_dict[kernel_id] = session_id
195 iopub_stream = sm.get_iopub_stream(session_id)
196 shell_stream = sm.get_shell_stream(session_id)
197 iopub_router = IOPubStreamRouter(iopub_stream)
198 shell_router = ShellStreamRouter(shell_stream)
199 self._routers[(kernel_id, session_id, 'iopub')] = iopub_router
200 self._routers[(kernel_id, session_id, 'shell')] = shell_router
201 logging.info("Session started: %s, %s" % (kernel_id, session_id))
202
203 def stop_session(self, kernel_id):
204 # TODO: finish this!
205 sm = self.kernel_manager.get_session_manager(kernel_id)
206 session_id = self._session_dict[kernel_id]
207
208 def get_router(self, kernel_id, stream_name):
209 session_id = self._session_dict[kernel_id]
210 router = self._routers[(kernel_id, session_id, stream_name)]
211 return router
174 212
175 213
176 214 def main():
@@ -92,7 +92,6 var Notebook = function (selector) {
92 92 this.element.scroll();
93 93 this.element.data("notebook", this);
94 94 this.next_prompt_number = 1;
95 this.next_kernel_number = 0;
96 95 this.kernel = null;
97 96 this.msg_cell_map = {};
98 97 this.bind_events();
@@ -429,20 +428,13 Notebook.prototype.expand = function (index) {
429 428 // Kernel related things
430 429
431 430 Notebook.prototype.start_kernel = function () {
432 this.kernel = new Kernel("kernel" + this.next_kernel_number);
433 this.next_kernel_number = this.next_kernel_number + 1;
431 this.kernel = new Kernel();
434 432 this.kernel.start_kernel(this._kernel_started, this);
435 433 };
436 434
437 435
438 436 Notebook.prototype._kernel_started = function () {
439 437 console.log("Kernel started: ", this.kernel.kernel_id);
440 this.kernel.start_session(this._session_started, this);
441 };
442
443
444 Notebook.prototype._session_started = function () {
445 console.log("Session started: ", this.kernel.session_id);
446 438 var that = this;
447 439
448 440 this.kernel.shell_channel.onmessage = function (e) {
@@ -711,11 +703,10 TextCell.prototype.config_mathjax = function () {
711 703 //============================================================================
712 704
713 705
714 var Kernel = function (kernel_id) {
715 this.kernel_id = kernel_id;
706 var Kernel = function () {
707 this.kernel_id = null;
716 708 this.base_url = "/kernels";
717 this.kernel_url = this.base_url + "/" + this.kernel_id
718 this.session_id = null;
709 this.kernel_url = null;
719 710 };
720 711
721 712
@@ -734,32 +725,26 Kernel.prototype.get_msg = function (msg_type, content) {
734 725 }
735 726
736 727 Kernel.prototype.start_kernel = function (callback, context) {
737 $.post(this.kernel_url, function () {
738 callback.call(context);
739 });
740 };
741
742
743 Kernel.prototype.start_session = function (callback, context) {
744 728 var that = this;
745 $.post(this.kernel_url + "/sessions",
746 function (session_id) {
747 that._handle_start_session(session_id, callback, context);
748 },
749 'json');
750 }
729 $.post(this.base_url,
730 function (kernel_id) {
731 that._handle_start_kernel(kernel_id, callback, context);
732 },
733 'json'
734 );
735 };
751 736
752 737
753 Kernel.prototype._handle_start_session = function (session_id, callback, context) {
754 this.session_id = session_id;
755 this.session_url = this.kernel_url + "/sessions/" + this.session_id;
738 Kernel.prototype._handle_start_kernel = function (kernel_id, callback, context) {
739 this.kernel_id = kernel_id;
740 this.kernel_url = this.base_url + "/" + this.kernel_id;
756 741 this._start_channels();
757 742 callback.call(context);
758 743 };
759 744
760 745
761 746 Kernel.prototype._start_channels = function () {
762 var ws_url = "ws://127.0.0.1:8888" + this.session_url;
747 var ws_url = "ws://127.0.0.1:8888" + this.kernel_url;
763 748 this.shell_channel = new WebSocket(ws_url + "/shell");
764 749 this.iopub_channel = new WebSocket(ws_url + "/iopub");
765 750 }
General Comments 0
You need to be logged in to leave comments. Login now